aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-12 00:43:28 -0700
committerReynold Xin <rxin@databricks.com>2016-04-12 00:43:28 -0700
commitb0f5497e9520575e5082fa8ce8be5569f43abe74 (patch)
treeefd349be7227cf20616712fc7376b7c2f11f6614
parent678b96e77bf77a64b8df14b19db5a3bb18febfe3 (diff)
downloadspark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.gz
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.tar.bz2
spark-b0f5497e9520575e5082fa8ce8be5569f43abe74.zip
[SPARK-14508][BUILD] Add a new ScalaStyle Rule `OmitBracesInCase`
## What changes were proposed in this pull request? According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule. ``` case: Always omit braces in case clauses. ``` This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code. ## How was this patch tested? Pass the Jenkins tests (including Scala style checking) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12280 from dongjoon-hyun/SPARK-14508.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala6
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/EventLoop.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/UnpersistSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala15
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalALS.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala6
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala25
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala3
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala36
-rw-r--r--scalastyle-config.xml5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala24
64 files changed, 164 insertions, 293 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f0d152f05a..966198dd5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2397,9 +2397,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
val backend = try {
val clazz =
@@ -2407,9 +2406,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
scheduler.initialize(backend)
(backend, scheduler)
@@ -2421,9 +2419,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
val backend = try {
@@ -2432,9 +2429,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
- case e: Exception => {
+ case e: Exception =>
throw new SparkException("YARN mode not available ?", e)
- }
}
scheduler.initialize(backend)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ab89f4c4e4..3d11db7461 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -101,14 +101,13 @@ class SparkEnv (
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
- case Some(path) => {
+ case Some(path) =>
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
- }
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 6f6730690f..6259bead3e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -134,11 +134,10 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
- case array: Array[Any] => {
+ case array: Array[Any] =>
val arrayWriteable = new ArrayWritable(classOf[Writable])
arrayWriteable.set(array.map(convertToWritable(_)))
arrayWriteable
- }
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4bca16a234..ab5b6c8380 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
objs.append(obj)
}
} catch {
- case eof: EOFException => {}
+ case eof: EOFException => // No-op
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
} finally {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 41ac308808..cda9d38c6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -152,10 +152,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
- }
}
}
@@ -174,10 +173,9 @@ class SparkHadoopUtil extends Logging {
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
+ case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
- }
}
}
@@ -315,7 +313,7 @@ class SparkHadoopUtil extends Logging {
*/
def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = {
text match {
- case HADOOP_CONF_PATTERN(matched) => {
+ case HADOOP_CONF_PATTERN(matched) =>
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
val eval = Option[String](hadoopConf.get(key))
@@ -330,11 +328,9 @@ class SparkHadoopUtil extends Logging {
// Continue to substitute more variables.
substituteHadoopVariables(eval.get, hadoopConf)
}
- }
- case _ => {
+ case _ =>
logDebug(text + " didn't match " + HADOOP_CONF_PATTERN)
text
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 01901bbf85..9bd3fc1033 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -217,7 +217,7 @@ private[deploy] class Master(
}
override def receive: PartialFunction[Any, Unit] = {
- case ElectedLeader => {
+ case ElectedLeader =>
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
@@ -233,16 +233,14 @@ private[deploy] class Master(
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
- }
case CompleteRecovery => completeRecovery()
- case RevokedLeadership => {
+ case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
- }
- case RegisterApplication(description, driver) => {
+ case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
@@ -255,12 +253,11 @@ private[deploy] class Master(
driver.send(RegisteredApplication(app.id, self))
schedule()
}
- }
- case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
+ case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
- case Some(exec) => {
+ case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state
@@ -298,22 +295,19 @@ private[deploy] class Master(
}
}
}
- }
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
- }
- case DriverStateChanged(driverId, state, exception) => {
+ case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
- }
- case Heartbeat(workerId, worker) => {
+ case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
@@ -327,9 +321,8 @@ private[deploy] class Master(
" This worker was never registered, so ignoring the heartbeat.")
}
}
- }
- case MasterChangeAcknowledged(appId) => {
+ case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
@@ -339,9 +332,8 @@ private[deploy] class Master(
}
if (canCompleteRecovery) { completeRecovery() }
- }
- case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
+ case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
@@ -367,7 +359,6 @@ private[deploy] class Master(
}
if (canCompleteRecovery) { completeRecovery() }
- }
case WorkerLatestState(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
@@ -397,9 +388,8 @@ private[deploy] class Master(
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
- case CheckForWorkerTimeOut => {
+ case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
- }
case AttachCompletedRebuildUI(appId) =>
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
@@ -408,7 +398,7 @@ private[deploy] class Master(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
+ id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -430,9 +420,8 @@ private[deploy] class Master(
+ workerAddress))
}
}
- }
- case RequestSubmitDriver(description) => {
+ case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
@@ -451,9 +440,8 @@ private[deploy] class Master(
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
- }
- case RequestKillDriver(driverId) => {
+ case RequestKillDriver(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Can only kill drivers in ALIVE state."
@@ -484,9 +472,8 @@ private[deploy] class Master(
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}
- }
- case RequestDriverStatus(driverId) => {
+ case RequestDriverStatus(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only request driver status in ALIVE state."
@@ -501,18 +488,15 @@ private[deploy] class Master(
context.reply(DriverStatusResponse(found = false, None, None, None, None))
}
}
- }
- case RequestMasterState => {
+ case RequestMasterState =>
context.reply(MasterStateResponse(
address.host, address.port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))
- }
- case BoundPortsRequest => {
+ case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
- }
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9cd7458ba0..585e0839d0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -78,7 +78,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
case ("--help") :: tail =>
printUsageAndExit(0)
- case Nil => {}
+ case Nil => // No-op
case _ =>
printUsageAndExit(1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 79f77212fe..af850e4871 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -70,11 +70,10 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
try {
Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
} catch {
- case e: Exception => {
+ case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(WORKING_DIR + "/" + filename)
None
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
index b97805a28b..11e13441ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -76,14 +76,13 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
case ("--help") :: tail =>
printUsageAndExit(0)
- case Nil => {
+ case Nil =>
if (masterUrl == null) {
// scalastyle:off println
System.err.println("--master is required")
// scalastyle:on println
printUsageAndExit(1)
}
- }
case _ =>
printUsageAndExit(1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f9c92c3bb9..06066248ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -179,16 +179,14 @@ private[deploy] class ExecutorRunner(
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
- case interrupted: InterruptedException => {
+ case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
- }
- case e: Exception => {
+ case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1b7637a39c..449beb0811 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -480,7 +480,7 @@ private[deploy] class Worker(
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
- case e: Exception => {
+ case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
@@ -488,7 +488,6 @@ private[deploy] class Worker(
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
- }
}
}
@@ -509,7 +508,7 @@ private[deploy] class Worker(
}
}
- case LaunchDriver(driverId, driverDesc) => {
+ case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
@@ -525,9 +524,8 @@ private[deploy] class Worker(
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
- }
- case KillDriver(driverId) => {
+ case KillDriver(driverId) =>
logInfo(s"Asked to kill driver $driverId")
drivers.get(driverId) match {
case Some(runner) =>
@@ -535,11 +533,9 @@ private[deploy] class Worker(
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
- }
- case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
+ case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
- }
case ReregisterWithMaster =>
reregisterWithMaster()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 391eb41190..777020d4d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -165,12 +165,11 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
}
// scalastyle:on classforname
} catch {
- case e: Exception => {
+ case e: Exception =>
totalMb = 2*1024
// scalastyle:off println
System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
// scalastyle:on println
- }
}
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, Utils.DEFAULT_DRIVER_MEM_MB)
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d4ed5845e7..71b4ad160d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -62,10 +62,9 @@ private[spark] class CoarseGrainedExecutorBackend(
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
- case Failure(e) => {
+ case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
- }
}(ThreadUtils.sameThread)
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 4da1017d28..0fed991049 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -196,10 +196,9 @@ private[spark] class MetricsSystem private (
sinks += sink.asInstanceOf[Sink]
}
} catch {
- case e: Exception => {
+ case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
index c562c70aba..ab6aba6fc7 100644
--- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
+++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
@@ -32,12 +32,11 @@ class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, v
*/
override def equals(that: Any): Boolean =
that match {
- case that: BoundedDouble => {
+ case that: BoundedDouble =>
this.mean == that.mean &&
this.confidence == that.confidence &&
this.low == that.low &&
this.high == that.high
- }
case _ => false
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 5e9230e733..368916a39e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -166,8 +166,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
val counters = new Array[Long](buckets.length - 1)
while (iter.hasNext) {
bucketFunction(iter.next()) match {
- case Some(x: Int) => {counters(x) += 1}
- case _ => {}
+ case Some(x: Int) => counters(x) += 1
+ case _ => // No-Op
}
}
Iterator(counters)
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 363004e587..a5992022d0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -86,12 +86,11 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
val rddToFilter: RDD[P] = self.partitioner match {
- case Some(rp: RangePartitioner[K, V]) => {
+ case Some(rp: RangePartitioner[K, V]) =>
val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match {
case (l, u) => Math.min(l, u) to Math.max(l, u)
}
PartitionPruningRDD.create(self, partitionIndicies.contains)
- }
case _ =>
self
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 582fa93afe..462fb39ea2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -128,7 +128,7 @@ private object ParallelCollectionRDD {
})
}
seq match {
- case r: Range => {
+ case r: Range =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
@@ -138,8 +138,7 @@ private object ParallelCollectionRDD {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
- }
- case nr: NumericRange[_] => {
+ case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
@@ -149,14 +148,12 @@ private object ParallelCollectionRDD {
r = r.drop(sliceSize)
}
slices
- }
- case _ => {
+ case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
- }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 9e3880714a..c3579d761d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -78,11 +78,10 @@ class PartitionerAwareUnionRDD[T: ClassTag](
logDebug("Finding preferred location for " + this + ", partition " + s.index)
val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
val locations = rdds.zip(parentPartitions).flatMap {
- case (rdd, part) => {
+ case (rdd, part) =>
val parentLocations = currPrefLocs(rdd, part)
logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
parentLocations
- }
}
val location = if (locations.isEmpty) {
None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 0640f26051..a6b032cc00 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -57,11 +57,10 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// Since we are not doing canonicalization of path, this can be wrong : like relative vs
// absolute path .. which is fine, this is best case effort to remove duplicates - right ?
override def equals(other: Any): Boolean = other match {
- case that: InputFormatInfo => {
+ case that: InputFormatInfo =>
// not checking config - that should be fine, right ?
this.inputFormatClazz == that.inputFormatClazz &&
this.path == that.path
- }
case _ => false
}
@@ -86,10 +85,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
}
catch {
- case e: ClassNotFoundException => {
+ case e: ClassNotFoundException =>
throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
" cannot be found ?", e)
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
index 6e9337bb90..bc1431835e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
@@ -49,14 +49,13 @@ class SplitInfo(
// So unless there is identity equality between underlyingSplits, it will always fail even if it
// is pointing to same block.
override def equals(other: Any): Boolean = other match {
- case that: SplitInfo => {
+ case that: SplitInfo =>
this.hostLocation == that.hostLocation &&
this.inputFormatClazz == that.inputFormatClazz &&
this.path == that.path &&
this.length == that.length &&
// other split specific checks (like start for FileSplit)
this.underlyingSplit == that.underlyingSplit
- }
case _ => false
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 873f1b56bd..ae7ef46abb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -133,7 +133,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
// if we can't deserialize the reason.
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex: Exception => {}
+ case ex: Exception => // No-op
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 15d3515a02..6e08cdd87a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -188,20 +188,18 @@ private[spark] class TaskSetManager(
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
- case e: HDFSCacheTaskLocation => {
+ case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
- case Some(set) => {
+ case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
- }
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
- }
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
@@ -437,7 +435,7 @@ private[spark] class TaskSetManager(
}
dequeueTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality, speculative)) => {
+ case Some((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
@@ -486,7 +484,6 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
- }
case _ =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 3971e6c382..61ab3e87c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -121,11 +121,10 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
Some(Utils.deserialize[T](fileData))
} catch {
case e: NoNodeException => None
- case e: Exception => {
+ case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(zkPath)
None
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 374c79a7e5..1b7ac172de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -55,11 +55,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
Some(vol.setContainerPath(container_path)
.setHostPath(host_path)
.setMode(Volume.Mode.RO))
- case spec => {
+ case spec =>
logWarning(s"Unable to parse volume specs: $volumes. "
+ "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
None
- }
}
}
.map { _.build() }
@@ -90,11 +89,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
Some(portmap.setHostPort(host_port.toInt)
.setContainerPort(container_port.toInt)
.setProtocol(protocol))
- case spec => {
+ case spec =>
logWarning(s"Unable to parse port mapping specs: $portmaps. "
+ "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
None
- }
}
}
.map { _.build() }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 233bdc23e6..7295d50682 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -124,11 +124,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
markErr()
}
} catch {
- case e: Exception => {
+ case e: Exception =>
logError("driver.run() failed", e)
error = Some(e)
markErr()
- }
}
}
}.start()
@@ -184,7 +183,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
var remain = amountToUse
var requestedResources = new ArrayBuffer[Resource]
val remainingResources = resources.asScala.map {
- case r => {
+ case r =>
if (remain > 0 &&
r.getType == Value.Type.SCALAR &&
r.getScalar.getValue > 0.0 &&
@@ -196,7 +195,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
} else {
r
}
- }
}
// Filter any resource that has depleted.
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 5ead40e89e..cb95246d5b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -188,10 +188,9 @@ abstract class DeserializationStream {
try {
(readKey[Any](), readValue[Any]())
} catch {
- case eof: EOFException => {
+ case eof: EOFException =>
finished = true
null
- }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 25edb9f1e4..4ec5b4bbb0 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -143,13 +143,12 @@ final class ShuffleBlockFetcherIterator(
while (iter.hasNext) {
val result = iter.next()
result match {
- case SuccessFetchResult(_, address, _, buf, _) => {
+ case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
- }
case _ =>
}
}
@@ -313,7 +312,7 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match {
- case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => {
+ case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
@@ -323,7 +322,6 @@ final class ShuffleBlockFetcherIterator(
reqsInFlight -= 1
logDebug("Number of requests in flight " + reqsInFlight)
}
- }
case _ =>
}
// Send fetch requests up to maxBytesInFlight
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index cc476d61b5..a0ef80d9bd 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -38,7 +38,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
val content = maybeThreadDump.map { threadDump =>
val dumpRows = threadDump.sortWith {
- case (threadTrace1, threadTrace2) => {
+ case (threadTrace1, threadTrace2) =>
val v1 = if (threadTrace1.threadName.contains("Executor task launch")) 1 else 0
val v2 = if (threadTrace2.threadName.contains("Executor task launch")) 1 else 0
if (v1 == v2) {
@@ -46,7 +46,6 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
} else {
v1 > v2
}
- }
}.map { thread =>
val threadId = thread.threadId
<tr id={s"thread_${threadId}_tr"} class="accordion-heading"
diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
index 153025cef2..3ea9139e11 100644
--- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala
+++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala
@@ -47,13 +47,12 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
try {
onReceive(event)
} catch {
- case NonFatal(e) => {
+ case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
- }
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3f627a0145..6861a75612 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -151,13 +151,12 @@ object SizeEstimator extends Logging {
// TODO: We could use reflection on the VMOption returned ?
getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
- case e: Exception => {
+ case e: Exception =>
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
val guessInWords = if (guess) "yes" else "not"
logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
return guess
- }
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 67d722c1dc..2110d3d770 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -320,7 +320,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
Thread.sleep(200)
}
} catch {
- case _: Throwable => { Thread.sleep(10) }
+ case _: Throwable => Thread.sleep(10)
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index 3706455c3f..8feb3dee05 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -82,20 +82,18 @@ package object testPackage extends Assertions {
val curCallSite = sc.getCallSite().shortForm // note: 2 lines after definition of "rdd"
val rddCreationLine = rddCreationSite match {
- case CALL_SITE_REGEX(func, file, line) => {
+ case CALL_SITE_REGEX(func, file, line) =>
assert(func === "makeRDD")
assert(file === "SparkContextInfoSuite.scala")
line.toInt
- }
case _ => fail("Did not match expected call site format")
}
curCallSite match {
- case CALL_SITE_REGEX(func, file, line) => {
+ case CALL_SITE_REGEX(func, file, line) =>
assert(func === "getCallSite") // this is correct because we called it from outside of Spark
assert(file === "SparkContextInfoSuite.scala")
assert(line.toInt === rddCreationLine.toInt + 2)
- }
case _ => fail("Did not match expected call site format")
}
}
diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
index f7a13ab399..09e21646ee 100644
--- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala
@@ -35,7 +35,7 @@ class UnpersistSuite extends SparkFunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case _: Throwable => { Thread.sleep(10) }
+ case _: Throwable => Thread.sleep(10)
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 43e61241b6..cebac2097f 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -127,9 +127,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
val reply = rpcEndpointRef.askWithRetry[String]("hello")
@@ -141,9 +140,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
@@ -164,10 +162,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
Thread.sleep(100)
context.reply(msg)
- }
}
})
@@ -317,10 +314,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = env
override def receive: PartialFunction[Any, Unit] = {
- case m => {
+ case m =>
self
callSelfSuccessfully = true
- }
}
})
@@ -682,9 +678,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
override val rpcEnv = localEnv
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case msg: String => {
+ case msg: String =>
context.reply(msg)
- }
}
})
val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication")
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index 973b005f91..ca4eea2356 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -106,9 +106,8 @@ object CassandraCQLTest {
println("Count: " + casRdd.count)
val productSaleRDD = casRdd.map {
- case (key, value) => {
+ case (key, value) =>
(ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
- }
}
val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
@@ -116,11 +115,10 @@ object CassandraCQLTest {
}
val casoutputCF = aggregatedRDD.map {
- case (productId, saleCount) => {
+ case (productId, saleCount) =>
val outKey = Collections.singletonMap("prod_id", ByteBufferUtil.bytes(productId))
val outVal = Collections.singletonList(ByteBufferUtil.bytes(saleCount))
(outKey, outVal)
- }
}
casoutputCF.saveAsNewAPIHadoopFile(
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 6a8f73ad00..eff840d36e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -90,9 +90,8 @@ object CassandraTest {
// Let us first get all the paragraphs from the retrieved rows
val paraRdd = casRdd.map {
- case (key, value) => {
+ case (key, value) =>
ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
- }
}
// Lets get the word count in paras
@@ -103,7 +102,7 @@ object CassandraTest {
}
counts.map {
- case (word, count) => {
+ case (word, count) =>
val colWord = new org.apache.cassandra.thrift.Column()
colWord.setName(ByteBufferUtil.bytes("word"))
colWord.setValue(ByteBufferUtil.bytes(word))
@@ -122,7 +121,6 @@ object CassandraTest {
mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
mutations.get(1).column_or_supercolumn.setColumn(colCount)
(outputkey, mutations)
- }
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
classOf[ColumnFamilyOutputFormat], job.getConfiguration)
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index af5f216f28..fa10101955 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -104,16 +104,14 @@ object LocalALS {
def main(args: Array[String]) {
args match {
- case Array(m, u, f, iters) => {
+ case Array(m, u, f, iters) =>
M = m.toInt
U = u.toInt
F = f.toInt
ITERATIONS = iters.toInt
- }
- case _ => {
+ case _ =>
System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
System.exit(1)
- }
}
showWarning()
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
index a0bb5dabf4..0b5d31c0ff 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
@@ -118,17 +118,15 @@ object OneVsRestExample {
val inputData = sqlContext.read.format("libsvm").load(params.input)
// compute the train/test split: if testInput is not provided use part of input.
val data = params.testInput match {
- case Some(t) => {
+ case Some(t) =>
// compute the number of features in the training set.
val numFeatures = inputData.first().getAs[Vector](1).size
val testData = sqlContext.read.option("numFeatures", numFeatures.toString)
.format("libsvm").load(t)
Array[DataFrame](inputData, testData)
- }
- case None => {
+ case None =>
val f = params.fracTest
inputData.randomSplit(Array(1 - f, f), seed = 12345)
- }
}
val Array(train, test) = data.map(_.cache())
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index c263f4f595..ee811d3aa1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -180,7 +180,7 @@ object DecisionTreeRunner {
}
// For classification, re-index classes if needed.
val (examples, classIndexMap, numClasses) = algo match {
- case Classification => {
+ case Classification =>
// classCounts: class --> # examples in class
val classCounts = origExamples.map(_.label).countByValue()
val sortedClasses = classCounts.keys.toList.sorted
@@ -209,7 +209,6 @@ object DecisionTreeRunner {
println(s"$c\t$frac\t${classCounts(c)}")
}
(examples, classIndexMap, numClasses)
- }
case Regression =>
(origExamples, null, 0)
case _ =>
@@ -225,7 +224,7 @@ object DecisionTreeRunner {
case "libsvm" => MLUtils.loadLibSVMFile(sc, testInput, numFeatures)
}
algo match {
- case Classification => {
+ case Classification =>
// classCounts: class --> # examples in class
val testExamples = {
if (classIndexMap.isEmpty) {
@@ -235,7 +234,6 @@ object DecisionTreeRunner {
}
}
Array(examples, testExamples)
- }
case Regression =>
Array(examples, origTestExamples)
}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index 41c6ab123b..80e0cce055 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -73,7 +73,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
receiver.setCheckpointer(shardId, checkpointer)
} catch {
- case NonFatal(e) => {
+ case NonFatal(e) =>
/*
* If there is a failure within the batch, the batch will not be checkpointed.
* This will potentially cause records since the last checkpoint to be processed
@@ -84,7 +84,6 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
/* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
throw e
- }
}
} else {
/* RecordProcessor has been stopped. */
@@ -148,29 +147,25 @@ private[kinesis] object KinesisRecordProcessor extends Logging {
/* If the function failed, either retry or throw the exception */
case util.Failure(e) => e match {
/* Retry: Throttling or other Retryable exception has occurred */
- case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
- => {
- val backOffMillis = Random.nextInt(maxBackOffMillis)
- Thread.sleep(backOffMillis)
- logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
- retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
- }
+ case _: ThrottlingException | _: KinesisClientLibDependencyException
+ if numRetriesLeft > 1 =>
+ val backOffMillis = Random.nextInt(maxBackOffMillis)
+ Thread.sleep(backOffMillis)
+ logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
+ retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
/* Throw: Shutdown has been requested by the Kinesis Client Library. */
- case _: ShutdownException => {
+ case _: ShutdownException =>
logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
throw e
- }
/* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
- case _: InvalidStateException => {
+ case _: InvalidStateException =>
logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
throw e
- }
/* Throw: Unexpected exception has occurred */
- case _ => {
+ case _ =>
logError(s"Unexpected, non-retryable exception.", e)
throw e
- }
}
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
index 551e75dc0a..fa143715be 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
@@ -53,7 +53,7 @@ private[r] object SparkRWrappers {
def getModelCoefficients(model: PipelineModel): Array[Double] = {
model.stages.last match {
- case m: LinearRegressionModel => {
+ case m: LinearRegressionModel =>
val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++
m.summary.coefficientStandardErrors.dropRight(1)
val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1)
@@ -64,14 +64,12 @@ private[r] object SparkRWrappers {
} else {
m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR
}
- }
- case m: LogisticRegressionModel => {
+ case m: LogisticRegressionModel =>
if (m.getFitIntercept) {
Array(m.intercept) ++ m.coefficients.toArray
} else {
m.coefficients.toArray
}
- }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
index 03eb903bb8..f04c87259c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala
@@ -181,13 +181,12 @@ class GaussianMixture private (
val (weights, gaussians) = initialModel match {
case Some(gmm) => (gmm.weights, gmm.gaussians)
- case None => {
+ case None =>
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
})
- }
}
var llh = Double.MinValue // current log-likelihood
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index 02417b1124..f87613cc72 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -183,7 +183,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
val k = (metadata \ "k").extract[Int]
val classNameV1_0 = SaveLoadV1_0.classNameV1_0
(loadedClassName, version) match {
- case (classNameV1_0, "1.0") => {
+ case (classNameV1_0, "1.0") =>
val model = SaveLoadV1_0.load(sc, path)
require(model.weights.length == k,
s"GaussianMixtureModel requires weights of length $k " +
@@ -192,7 +192,6 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
s"GaussianMixtureModel requires gaussians of length $k" +
s"got gaussians of length ${model.gaussians.length}")
model
- }
case _ => throw new Exception(
s"GaussianMixtureModel.load did not recognize model with (className, format version):" +
s"($loadedClassName, $version). Supported:\n" +
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 37a21cd879..8ff0b83e8b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -253,16 +253,14 @@ class KMeans private (
}
val centers = initialModel match {
- case Some(kMeansCenters) => {
+ case Some(kMeansCenters) =>
Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))
- }
- case None => {
+ case None =>
if (initializationMode == KMeans.RANDOM) {
initRandom(data)
} else {
initKMeansParallel(data)
}
- }
}
val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) +
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index 0ec8975fed..ef284531c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -97,7 +97,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
: KolmogorovSmirnovTestResult = {
val distObj =
distName match {
- case "norm" => {
+ case "norm" =>
if (params.nonEmpty) {
// parameters are passed, then can only be 2
require(params.length == 2, "Normal distribution requires mean and standard " +
@@ -109,7 +109,6 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
"initialized to standard normal (i.e. N(0, 1))")
new NormalDistribution(0, 1)
}
- }
case _ => throw new UnsupportedOperationException(s"$distName not yet supported through" +
s" convenience method. Current options are:['norm'].")
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 928aaa5629..4a15d52b57 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -70,26 +70,24 @@ class ExecutorClassLoader(
}
override def findClass(name: String): Class[_] = {
- userClassPathFirst match {
- case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name))
- case false => {
- try {
- parentLoader.loadClass(name)
- } catch {
- case e: ClassNotFoundException => {
- val classOption = findClassLocally(name)
- classOption match {
- case None =>
- // If this class has a cause, it will break the internal assumption of Janino
- // (the compiler used for Spark SQL code-gen).
- // See org.codehaus.janino.ClassLoaderIClassLoader's findIClass, you will see
- // its behavior will be changed if there is a cause and the compilation
- // of generated class will fail.
- throw new ClassNotFoundException(name)
- case Some(a) => a
- }
+ if (userClassPathFirst) {
+ findClassLocally(name).getOrElse(parentLoader.loadClass(name))
+ } else {
+ try {
+ parentLoader.loadClass(name)
+ } catch {
+ case e: ClassNotFoundException =>
+ val classOption = findClassLocally(name)
+ classOption match {
+ case None =>
+ // If this class has a cause, it will break the internal assumption of Janino
+ // (the compiler used for Spark SQL code-gen).
+ // See org.codehaus.janino.ClassLoaderIClassLoader's findIClass, you will see
+ // its behavior will be changed if there is a cause and the compilation
+ // of generated class will fail.
+ throw new ClassNotFoundException(name)
+ case Some(a) => a
}
- }
}
}
}
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 472a8f4084..a14e3e583f 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -228,6 +228,11 @@ This file is divided into 3 sections:
<customMessage>Use Javadoc style indentation for multiline comments</customMessage>
</check>
+ <check customId="OmitBracesInCase" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter name="regex">case[^\n>]*=>\s*\{</parameter></parameters>
+ <customMessage>Omit braces in case clauses.</customMessage>
+ </check>
+
<!-- ================================================================================ -->
<!-- rules we'd like to enforce, but haven't cleaned up the codebase yet -->
<!-- ================================================================================ -->
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index d842ffdc66..0f8876a9e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -898,7 +898,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val result = ctx.freshName("result")
val tmpRow = ctx.freshName("tmpRow")
- val fieldsEvalCode = fieldsCasts.zipWithIndex.map { case (cast, i) => {
+ val fieldsEvalCode = fieldsCasts.zipWithIndex.map { case (cast, i) =>
val fromFieldPrim = ctx.freshName("ffp")
val fromFieldNull = ctx.freshName("ffn")
val toFieldPrim = ctx.freshName("tfp")
@@ -920,7 +920,6 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}
}
"""
- }
}.mkString("\n")
(c, evPrim, evNull) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index affd1bdb32..8d8cc152ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -97,11 +97,11 @@ class EquivalentExpressions {
def debugString(all: Boolean = false): String = {
val sb: mutable.StringBuilder = new StringBuilder()
sb.append("Equivalent expressions:\n")
- equivalenceMap.foreach { case (k, v) => {
+ equivalenceMap.foreach { case (k, v) =>
if (all || v.length > 1) {
sb.append(" " + v.mkString(", ")).append("\n")
}
- }}
+ }
sb.toString()
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index 8207d64798..711e870711 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -196,12 +196,11 @@ object RandomDataGenerator {
case ShortType => randomNumeric[Short](
rand, _.nextInt().toShort, Seq(Short.MinValue, Short.MaxValue, 0.toShort))
case NullType => Some(() => null)
- case ArrayType(elementType, containsNull) => {
+ case ArrayType(elementType, containsNull) =>
forType(elementType, nullable = containsNull, rand).map {
elementGenerator => () => Seq.fill(rand.nextInt(MAX_ARR_SIZE))(elementGenerator())
}
- }
- case MapType(keyType, valueType, valueContainsNull) => {
+ case MapType(keyType, valueType, valueContainsNull) =>
for (
keyGenerator <- forType(keyType, nullable = false, rand);
valueGenerator <-
@@ -221,8 +220,7 @@ object RandomDataGenerator {
keys.zip(values).toMap
}
}
- }
- case StructType(fields) => {
+ case StructType(fields) =>
val maybeFieldGenerators: Seq[Option[() => Any]] = fields.map { field =>
forType(field.dataType, nullable = field.nullable, rand)
}
@@ -232,8 +230,7 @@ object RandomDataGenerator {
} else {
None
}
- }
- case udt: UserDefinedType[_] => {
+ case udt: UserDefinedType[_] =>
val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, rand)
// Because random data generator at here returns scala value, we need to
// convert it to catalyst value to call udt's deserialize.
@@ -253,7 +250,6 @@ object RandomDataGenerator {
} else {
None
}
- }
case unsupportedType => None
}
// Handle nullability by wrapping the non-null value generator:
@@ -277,7 +273,7 @@ object RandomDataGenerator {
val fields = mutable.ArrayBuffer.empty[Any]
schema.fields.foreach { f =>
f.dataType match {
- case ArrayType(childType, nullable) => {
+ case ArrayType(childType, nullable) =>
val data = if (f.nullable && rand.nextFloat() <= PROBABILITY_OF_NULL) {
null
} else {
@@ -294,10 +290,8 @@ object RandomDataGenerator {
arr
}
fields += data
- }
- case StructType(children) => {
+ case StructType(children) =>
fields += randomRow(rand, StructType(children))
- }
case _ =>
val generator = RandomDataGenerator.forType(f.dataType, f.nullable, rand)
assert(generator.isDefined, "Unsupported type")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 86c6405522..e953a6e8ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1153,14 +1153,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
private def verifyNonExchangingAgg(df: DataFrame) = {
var atFirstAgg: Boolean = false
df.queryExecution.executedPlan.foreach {
- case agg: TungstenAggregate => {
+ case agg: TungstenAggregate =>
atFirstAgg = !atFirstAgg
- }
- case _ => {
+ case _ =>
if (atFirstAgg) {
fail("Should not have operators between the two aggregations")
}
- }
}
}
@@ -1170,12 +1168,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
private def verifyExchangingAgg(df: DataFrame) = {
var atFirstAgg: Boolean = false
df.queryExecution.executedPlan.foreach {
- case agg: TungstenAggregate => {
+ case agg: TungstenAggregate =>
if (atFirstAgg) {
fail("Should not have back to back Aggregates")
}
atFirstAgg = true
- }
case e: ShuffleExchange => atFirstAgg = false
case _ =>
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 8a551cd78c..31b63f2ce1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -612,23 +612,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
val a2 = r2.getList(v._2).toArray
assert(a1.length == a2.length, "Seed = " + seed)
childType match {
- case DoubleType => {
+ case DoubleType =>
var i = 0
while (i < a1.length) {
assert(doubleEquals(a1(i).asInstanceOf[Double], a2(i).asInstanceOf[Double]),
"Seed = " + seed)
i += 1
}
- }
- case FloatType => {
+ case FloatType =>
var i = 0
while (i < a1.length) {
assert(doubleEquals(a1(i).asInstanceOf[Float], a2(i).asInstanceOf[Float]),
"Seed = " + seed)
i += 1
}
- }
-
case t: DecimalType =>
var i = 0
while (i < a1.length) {
@@ -640,7 +637,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
i += 1
}
-
case _ => assert(a1 === a2, "Seed = " + seed)
}
case StructType(childFields) =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c40beeff97..58842f9c2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -429,13 +429,12 @@ abstract class DStream[T: ClassTag] (
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
- case Some(rdd) => {
+ case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
- }
case None => None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 431c9dbe2c..e73837eb96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -109,10 +109,9 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
def restore() {
// Create RDDs from the checkpoint data
currentCheckpointFiles.foreach {
- case(time, file) => {
+ case(time, file) =>
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
- }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 7fba2e8ec0..36f50e04db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -333,14 +333,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
override def restore() {
hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
- case (t, f) => {
+ case (t, f) =>
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((t, f)) }
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
- }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 0379957e58..28aed0ca45 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -65,14 +65,12 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the previous state RDD
getOrCompute(validTime - slideDuration) match {
- case Some(prevStateRDD) => { // If previous state RDD exists
-
+ case Some(prevStateRDD) => // If previous state RDD exists
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
- case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ case Some(parentRDD) => // If parent RDD exists, then compute as usual
computeUsingPreviousRDD(parentRDD, prevStateRDD)
- }
- case None => { // If parent RDD does not exist
+ case None => // If parent RDD does not exist
// Re-apply the update function to the old state RDD
val updateFuncLocal = updateFunc
@@ -82,17 +80,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
- }
}
- }
-
- case None => { // If previous session RDD does not exist (first input data)
+ case None => // If previous session RDD does not exist (first input data)
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
- case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ case Some(parentRDD) => // If parent RDD exists, then compute as usual
initialRDD match {
- case None => {
+ case None =>
// Define the function for the mapPartition operation on grouped RDD;
// first map the grouped tuple to tuples of required type,
// and then apply the update function
@@ -105,18 +100,13 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
Some(sessionRDD)
- }
- case Some(initialStateRDD) => {
+ case Some(initialStateRDD) =>
computeUsingPreviousRDD(parentRDD, initialStateRDD)
- }
}
- }
- case None => { // If parent RDD does not exist, then nothing to do!
+ case None => // If parent RDD does not exist, then nothing to do!
// logDebug("Not generating state RDD (no previous state, no parent)")
None
- }
}
- }
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index bd60059b18..cfcbdc7c38 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -538,10 +538,9 @@ class BasicOperationsSuite extends TestSuiteBase {
val stateObj = state.getOrElse(new StateObject)
values.sum match {
case 0 => stateObj.expireCounter += 1 // no new values
- case n => { // has new values, increment and reset expireCounter
+ case n => // has new values, increment and reset expireCounter
stateObj.counter += n
stateObj.expireCounter = 0
- }
}
stateObj.expireCounter match {
case 2 => None // seen twice with no new values, give it the boot
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index fbb25d4c59..bdbac64b9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -267,10 +267,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before first failure does not exist")
- }
}
// Run till a further time such that previous checkpoint files in the stream would be deleted
@@ -297,10 +296,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before seconds failure does not exist")
- }
}
ssc.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 29bee4adf2..60c8e70235 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -382,11 +382,10 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
fs.rename(tempHadoopFile, hadoopFile)
done = true
} catch {
- case ioe: IOException => {
+ case ioe: IOException =>
fs = testDir.getFileSystem(new Configuration())
logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.",
ioe)
- }
}
}
if (!done) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9e8453429c..d447a59937 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -374,7 +374,7 @@ private[spark] class ApplicationMaster(
failureCount = 0
} catch {
case i: InterruptedException =>
- case e: Throwable => {
+ case e: Throwable =>
failureCount += 1
// this exception was introduced in hadoop 2.4 and this code would not compile
// with earlier versions if we refer it directly.
@@ -390,7 +390,6 @@ private[spark] class ApplicationMaster(
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
- }
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b0bfe855e9..23742eab62 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -148,11 +148,10 @@ private[yarn] class YarnAllocator(
classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
classOf[String]))
} catch {
- case e: NoSuchMethodException => {
+ case e: NoSuchMethodException =>
logWarning(s"Node label expression $expr will be ignored because YARN version on" +
" classpath does not support it.")
None
- }
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 8720ee57fe..6b3c831e60 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -223,17 +223,15 @@ private[spark] abstract class YarnSchedulerBackend(
val lossReasonRequest = GetExecutorLossReason(executorId)
val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
future onSuccess {
- case reason: ExecutorLossReason => {
+ case reason: ExecutorLossReason =>
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- }
}
future onFailure {
- case NonFatal(e) => {
+ case NonFatal(e) =>
logWarning(s"Attempted to get executor loss reason" +
s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
s" but got no response. Marking as slave lost.", e)
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
- }
case t => throw t
}
case None =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index de14e36f4e..fe09808ae5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -101,22 +101,18 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
viewAcls match {
- case Some(vacls) => {
+ case Some(vacls) =>
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
modifyAcls match {
- case Some(macls) => {
+ case Some(macls) =>
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
}
@@ -135,26 +131,22 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
viewAcls match {
- case Some(vacls) => {
+ case Some(vacls) =>
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains("user1"))
assert(aclSet.contains("user2"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
modifyAcls match {
- case Some(macls) => {
+ case Some(macls) =>
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains("user3"))
assert(aclSet.contains("user4"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
}