aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-12 00:43:28 -0700
committerReynold Xin <rxin@databricks.com>2016-04-12 00:43:28 -0700
commitb0f5497e9520575e5082fa8ce8be5569f43abe74 (patch)
treeefd349be7227cf20616712fc7376b7c2f11f6614 /core
parent678b96e77bf77a64b8df14b19db5a3bb18febfe3 (diff)
downloadspark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.gz
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.bz2
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.zip
[SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`
## What changes were proposed in this pull request? According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule. ``` case: Always omit braces in case clauses. ``` This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code. ## How was this patch tested? Pass the Jenkins tests (including Scala style checking) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12280 from dongjoon-hyun/SPARK-14508.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/EventLoop.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala15
35 files changed, 74 insertions, 141 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f0d152f05a..966198dd5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2397,9 +2397,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
val backend = try {
val clazz =
@@ -2407,9 +2406,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
scheduler.initialize(backend)
(backend, scheduler)
@@ -2421,9 +2419,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
val backend = try {
@@ -2432,9 +2429,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
scheduler.initialize(backend)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ab89f4c4e4..3d11db7461 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -101,14 +101,13 @@ class SparkEnv (
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
- case Some(path) => {
+ case Some(path) =>
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
- }
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 6f6730690f..6259bead3e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -134,11 +134,10 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
- case array: Array[Any] => {
+ case array: Array[Any] =>
val arrayWriteable = new ArrayWritable(classOf[Writable])
arrayWriteable.set(array.map(convertToWritable(_)))
arrayWriteable
- }
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4bca16a234..ab5b6c8380 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
objs.append(obj)
}
} catch {
- case eof: EOFException => {}
+ case eof: EOFException => // No-op
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
} finally {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 41ac308808..cda9d38c6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -152,10 +152,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
- }
}
}
@@ -174,10 +173,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
- }
}
}
@@ -315,7 +313,7 @@ class SparkHadoopUtil extends Logging {
*/
def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
text match {
- case HADOOP_CONF_PATTERN(matched) => {
+ case HADOOP_CONF_PATTERN(matched) =>
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
val eval = Option[String](hadoopConf.get(key))
@@ -330,11 +328,9 @@ class SparkHadoopUtil extends Logging {
// Continue to substitute more variables.
substituteHadoopVariables(eval.get, hadoopConf)
}
- }
- case _ => {
+ case _ =>
logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
text
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 01901bbf85..9bd3fc1033 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -217,7 +217,7 @@ private[deploy] class Master(
}
override def receive: PartialFunction[Any, Unit] = {
- case ElectedLeader => {
+ case ElectedLeader =>
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
@@ -233,16 +233,14 @@ private[deploy] class Master(
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
- }
case CompleteRecovery => completeRecovery()
- case RevokedLeadership => {
+ case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
- }
- case RegisterApplication(description, driver) => {
+ case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
@@ -255,12 +253,11 @@ private[deploy] class Master(
driver.send(RegisteredApplication(app.id, self))
schedule()
}
- }
- case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
+ case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
- case Some(exec) => {
+ case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state
@@ -298,22 +295,19 @@ private[deploy] class Master(
}
}
}
- }
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
- }
- case DriverStateChanged(driverId, state, exception) => {
+ case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
- }
- case Heartbeat(workerId, worker) => {
+ case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
@@ -327,9 +321,8 @@ private[deploy] class Master(
" This worker was never registered, so ignoring the heartbeat.")
}
}
- }
- case MasterChangeAcknowledged(appId) => {
+ case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
@@ -339,9 +332,8 @@ private[deploy] class Master(
}
if (canCompleteRecovery) { completeRecovery() }
- }
- case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
+ case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
@@ -367,7 +359,6 @@ private[deploy] class Master(
}
if (canCompleteRecovery) { completeRecovery() }
- }
case WorkerLatestState(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
@@ -397,9 +388,8 @@ private[deploy] class Master(
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
- case CheckForWorkerTimeOut => {
+ case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
- }
case AttachCompletedRebuildUI(appId) =>
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
@@ -408,7 +398,7 @@ private[deploy] class Master(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
+ id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -430,9 +420,8 @@ private[deploy] class Master(
+ workerAddress))
}
}
- }
- case RequestSubmitDriver(description) => {
+ case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
@@ -451,9 +440,8 @@ private[deploy] class Master(
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
- }
- case RequestKillDriver(driverId) => {
+ case RequestKillDriver(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Can only kill drivers in ALIVE state."
@@ -484,9 +472,8 @@ private[deploy] class Master(
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}
- }
- case RequestDriverStatus(driverId) => {
+ case RequestDriverStatus(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only request driver status in ALIVE state."
@@ -501,18 +488,15 @@ private[deploy] class Master(
context.reply(DriverStatusResponse(found = false, None, None, None, None))
}
}
- }
- case RequestMasterState => {
+ case RequestMasterState =>
context.reply(MasterStateResponse(
address.host, address.port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))
- }
- case BoundPortsRequest => {
+ case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
- }
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9cd7458ba0..585e0839d0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -78,7 +78,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
case ("--help") :: tail =>
printUsageAndExit(0)
- case Nil => {}
+ case Nil => // No-op
case _ =>
printUsageAndExit(1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 79f77212fe..af850e4871 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -70,11 +70,10 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
try {
Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
} catch {
- case e: Exception => {
+ case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(WORKING_DIR + "/" + filename)
None
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
index b97805a28b..11e13441ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -76,14 +76,13 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
case ("--help") :: tail =>
printUsageAndExit(0)
- case Nil => {
+ case Nil =>
if (masterUrl == null) {
// scalastyle:off println
System.err.println("--master is required")
// scalastyle:on println
printUsageAndExit(1)
}
- }
case _ =>
printUsageAndExit(1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f9c92c3bb9..06066248ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -179,16 +179,14 @@ private[deploy] class ExecutorRunner(
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
- case interrupted: InterruptedException => {
+ case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
- }
- case e: Exception => {
+ case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1b7637a39c..449beb0811 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -480,7 +480,7 @@ private[deploy] class Worker(
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
- case e: Exception => {
+ case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
@@ -488,7 +488,6 @@ private[deploy] class Worker(
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
- }
}
}
@@ -509,7 +508,7 @@ private[deploy] class Worker(
}
}
- case LaunchDriver(driverId, driverDesc) => {
+ case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
@@ -525,9 +524,8 @@ private[deploy] class Worker(
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
- }
- case KillDriver(driverId) => {
+ case KillDriver(driverId) =>
logInfo(s"Asked to kill driver $driverId")
drivers.get(driverId) match {
case Some(runner) =>
@@ -535,11 +533,9 @@ private[deploy] class Worker(
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
- }
- case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
+ case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
- }
case ReregisterWithMaster =>
reregisterWithMaster()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 391eb41190..777020d4d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -165,12 +165,11 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
}
// scalastyle:on classforname
} catch {
- case e: Exception => {
+ case e: Exception =>
totalMb = 2*1024
// scalastyle:off println
System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
// scalastyle:on println
- }
}
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, Utils.DEFAULT_DRIVER_MEM_MB)
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d4ed5845e7..71b4ad160d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -62,10 +62,9 @@ private[spark] class CoarseGrainedExecutorBackend(
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
- case Failure(e) => {
+ case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
- }
}(ThreadUtils.sameThread)
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 4da1017d28..0fed991049 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -196,10 +196,9 @@ private[spark] class MetricsSystem private (
sinks += sink.asInstanceOf[Sink]
}
} catch {
- case e: Exception => {
+ case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
index c562c70aba..ab6aba6fc7 100644
--- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
+++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
@@ -32,12 +32,11 @@ class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, v
*/
override def equals(that: Any): Boolean =
that match {
- case that: BoundedDouble => {
+ case that: BoundedDouble =>
this.mean == that.mean &&
this.confidence == that.confidence &&
this.low == that.low &&
this.high == that.high
- }
case _ => false
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 5e9230e733..368916a39e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -166,8 +166,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
val counters = new Array[Long](buckets.length - 1)
while (iter.hasNext) {
bucketFunction(iter.next()) match {
- case Some(x: Int) => {counters(x) += 1}
- case _ => {}
+ case Some(x: Int) => counters(x) += 1
+ case _ => // No-Op
}
}
Iterator(counters)
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 363004e587..a5992022d0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -86,12 +86,11 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
val rddToFilter: RDD[P] = self.partitioner match {
- case Some(rp: RangePartitioner[K, V]) => {
+ case Some(rp: RangePartitioner[K, V]) =>
val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match {
case (l, u) => Math.min(l, u) to Math.max(l, u)
}
PartitionPruningRDD.create(self, partitionIndicies.contains)
- }
case _ =>
self
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 582fa93afe..462fb39ea2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -128,7 +128,7 @@ private object ParallelCollectionRDD {
})
}
seq match {
- case r: Range => {
+ case r: Range =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
@@ -138,8 +138,7 @@ private object ParallelCollectionRDD {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
- }
- case nr: NumericRange[_] => {
+ case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
@@ -149,14 +148,12 @@ private object ParallelCollectionRDD {
r = r.drop(sliceSize)
}
slices
- }
- case _ => {
+ case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 9e3880714a..c3579d761d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -78,11 +78,10 @@ class PartitionerAwareUnionRDD[T: ClassTag](
logDebug("Finding preferred location for " + this + ", partition " + s.index)
val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
val locations = rdds.zip(parentPartitions).flatMap {
- case (rdd, part) => {
+ case (rdd, part) =>
val parentLocations = currPrefLocs(rdd, part)
logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
parentLocations
- }
}
val location = if (locations.isEmpty) {
None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 0640f26051..a6b032cc00 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -57,11 +57,10 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// Since we are not doing canonicalization of path, this can be wrong : like relative vs
// absolute path .. which is fine, this is best case effort to remove duplicates - right ?
override def equals(other: Any): Boolean = other match {
- case that: InputFormatInfo => {
+ case that: InputFormatInfo =>
// not checking config - that should be fine, right ?
this.inputFormatClazz == that.inputFormatClazz &&
this.path == that.path
- }
case _ => false
}
@@ -86,10 +85,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
}
catch {
- case e: ClassNotFoundException => {
+ case e: ClassNotFoundException =>
throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
" cannot be found ?", e)
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
index 6e9337bb90..bc1431835e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
@@ -49,14 +49,13 @@ class SplitInfo(
// So unless there is identity equality between underlyingSplits, it will always fail even if it
// is pointing to same block.
override def equals(other: Any): Boolean = other match {
- case that: SplitInfo => {
+ case that: SplitInfo =>
this.hostLocation == that.hostLocation &&
this.inputFormatClazz == that.inputFormatClazz &&
this.path == that.path &&
this.length == that.length &&
// other split specific checks (like start for FileSplit)
this.underlyingSplit == that.underlyingSplit
- }
case _ => false
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 873f1b56bd..ae7ef46abb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -133,7 +133,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// if we can't deserialize the reason.
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex: Exception => {}
+ case ex: Exception => // No-op
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 15d3515a02..6e08cdd87a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -188,20 +188,18 @@ private[spark] class TaskSetManager(
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
- case e: HDFSCacheTaskLocation => {
+ case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
- case Some(set) => {
+ case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
- }
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
- }
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
@@ -437,7 +435,7 @@ private[spark] class TaskSetManager(
}
dequeueTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality, speculative)) => {
+ case Some((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
@@ -486,7 +484,6 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
- }
case _ =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 3971e6c382..61ab3e87c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -121,11 +121,10 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
Some(Utils.deserialize[T](fileData))
} catch {
case e: NoNodeException => None
- case e: Exception => {
+ case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(zkPath)
None
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 374c79a7e5..1b7ac172de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -55,11 +55,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
Some(vol.setContainerPath(container_path)
.setHostPath(host_path)
.setMode(Volume.Mode.RO))
- case spec => {
+ case spec =>
logWarning(s"Unable to parse volume specs: $volumes. "
+ "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
None
- }
}
}
.map { _.build() }
@@ -90,11 +89,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
Some(portmap.setHostPort(host_port.toInt)
.setContainerPort(container_port.toInt)
.setProtocol(protocol))
- case spec => {
+ case spec =>
logWarning(s"Unable to parse port mapping specs: $portmaps. "
+ "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
None
- }
}
}
.map { _.build() }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 233bdc23e6..7295d50682 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -124,11 +124,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
markErr()
}
} catch {
- case e: Exception => {
+ case e: Exception =>
logError("driver.run() failed", e)
error = Some(e)
markErr()
- }
}
}
}.start()
@@ -184,7 +183,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
var remain = amountToUse
var requestedResources = new ArrayBuffer[Resource]
val remainingResources = resources.asScala.map {
- case r => {
+ case r =>
if (remain > 0 &&
r.getType == Value.Type.SCALAR &&
r.getScalar.getValue > 0.0 &&
@@ -196,7 +195,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
} else {
r
}
- }
}
// Filter any resource that has depleted.
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 5ead40e89e..cb95246d5b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -188,10 +188,9 @@ abstract class DeserializationStream {
try {
(readKey[Any](), readValue[Any]())
} catch {
- case eof: EOFException => {
+ case eof: EOFException =>
finished = true
null
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 25edb9f1e4..4ec5b4bbb0 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -143,13 +143,12 @@ final class ShuffleBlockFetcherIterator(
while (iter.hasNext) {
val result = iter.next()
result match {
- case SuccessFetchResult(_, address, _, buf, _) => {
+ case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
- }
case _ =>
}
}
@@ -313,7 +312,7 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match {
- case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => {
+ case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
@@ -323,7 +322,6 @@ final class ShuffleBlockFetcherIterator(
reqsInFlight -= 1
logDebug("Number of requests in flight " + reqsInFlight)
}
- }
case _ =>
}
// Send fetch requests up to maxBytesInFlight
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index cc476d61b5..a0ef80d9bd 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -38,7 +38,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
val content = maybeThreadDump.map { threadDump =>
val dumpRows = threadDump.sortWith {
- case (threadTrace1, threadTrace2) => {
+ case (threadTrace1, threadTrace2) =>
val v1 = if (threadTrace1.threadName.contains("Executor task launch")) 1 else 0
val v2 = if (threadTrace2.threadName.contains("Executor task launch")) 1 else 0
if (v1 == v2) {
@@ -46,7 +46,6 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
} else {
v1 > v2
}
- }
}.map { thread =>
val threadId = thread.threadId
<tr id={s"thread_${threadId}_tr"} class="accordion-heading"
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
index 153025cef2..3ea9139e11 100644
--- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala
+++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
@@ -47,13 +47,12 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
try {
onReceive(event)
} catch {
- case NonFatal(e) => {
+ case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
- }
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3f627a0145..6861a75612 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -151,13 +151,12 @@ object SizeEstimator extends Logging {
// TODO: We could use reflection on the VMOption returned ?
getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
- case e: Exception => {
+ case e: Exception =>
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
val guessInWords = if (guess) "yes" else "not"
logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
return guess
- }
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 67d722c1dc..2110d3d770 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -320,7 +320,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
Thread.sleep(200)
}
} catch {
- case _: Throwable => { Thread.sleep(10) }
+ case _: Throwable => Thread.sleep(10)
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index 3706455c3f..8feb3dee05 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -82,20 +82,18 @@ package object testPackage extends Assertions {
val curCallSite = sc.getCallSite().shortForm // note: 2 lines after definition of "rdd"
val rddCreationLine = rddCreationSite match {
- case CALL_SITE_REGEX(func, file, line) => {
+ case CALL_SITE_REGEX(func, file, line) =>
assert(func === "makeRDD")
assert(file === "SparkContextInfoSuite.scala")
line.toInt
- }
case _ => fail("Did not match expected call site format")
}
curCallSite match {
- case CALL_SITE_REGEX(func, file, line) => {
+ case CALL_SITE_REGEX(func, file, line) =>
assert(func === "getCallSite") // this is correct because we called it from outside of Spark
assert(file === "SparkContextInfoSuite.scala")
assert(line.toInt === rddCreationLine.toInt + 2)
- }
case _ => fail("Did not match expected call site format")
}
}
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index f7a13ab399..09e21646ee 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -35,7 +35,7 @@ class UnpersistSuite extends SparkFunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case _: Throwable => { Thread.sleep(10) }
+ case _: Throwable => Thread.sleep(10)
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 43e61241b6..cebac2097f 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -127,9 +127,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
val reply = rpcEndpointRef.askWithRetry[String]("hello")
@@ -141,9 +140,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
@@ -164,10 +162,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
Thread.sleep(100)
context.reply(msg)
- }
}
})
@@ -317,10 +314,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receive: PartialFunction[Any, Unit] = {
- case m => {
+ case m =>
self
callSelfSuccessfully = true
- }
}
})
@@ -682,9 +678,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = localEnv
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication")