diff options
Diffstat (limited to 'core/src')
35 files changed, 74 insertions, 141 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f0d152f05a..966198dd5e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2397,9 +2397,8 @@ object SparkContext extends Logging { } catch { // TODO: Enumerate the exact reasons why it can fail // But irrespective of it, it means we cannot proceed ! - case e: Exception => { + case e: Exception => throw new SparkException("YARN mode not available ?", e) - } } val backend = try { val clazz = @@ -2407,9 +2406,8 @@ object SparkContext extends Logging { val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { - case e: Exception => { + case e: Exception => throw new SparkException("YARN mode not available ?", e) - } } scheduler.initialize(backend) (backend, scheduler) @@ -2421,9 +2419,8 @@ object SparkContext extends Logging { cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { - case e: Exception => { + case e: Exception => throw new SparkException("YARN mode not available ?", e) - } } val backend = try { @@ -2432,9 +2429,8 @@ object SparkContext extends Logging { val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { - case e: Exception => { + case e: Exception => throw new SparkException("YARN mode not available ?", e) - } } scheduler.initialize(backend) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ab89f4c4e4..3d11db7461 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -101,14 +101,13 @@ class SparkEnv ( // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the // current working dir in executor which we do not need to delete. driverTmpDirToDelete match { - case Some(path) => { + case Some(path) => try { Utils.deleteRecursively(new File(path)) } catch { case e: Exception => logWarning(s"Exception while deleting Spark temp dir: $path", e) } - } case None => // We just need to delete tmp dir created by driver, so do nothing on executor } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 6f6730690f..6259bead3e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -134,11 +134,10 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] { mapWritable.put(convertToWritable(k), convertToWritable(v)) } mapWritable - case array: Array[Any] => { + case array: Array[Any] => val arrayWriteable = new ArrayWritable(classOf[Writable]) arrayWriteable.set(array.map(convertToWritable(_))) arrayWriteable - } case other => throw new SparkException( s"Data of type ${other.getClass.getName} cannot be used") } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 4bca16a234..ab5b6c8380 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging { objs.append(obj) } } catch { - case eof: EOFException => {} + case eof: EOFException => // No-op } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } finally { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 41ac308808..cda9d38c6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -152,10 +152,9 @@ class SparkHadoopUtil extends Logging { val baselineBytesRead = f() Some(() => f() - baselineBytesRead) } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => { + case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) None - } } } @@ -174,10 +173,9 @@ class SparkHadoopUtil extends Logging { val baselineBytesWritten = f() Some(() => f() - baselineBytesWritten) } catch { - case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => { + case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e) None - } } } @@ -315,7 +313,7 @@ class SparkHadoopUtil extends Logging { */ def substituteHadoopVariables(text: String, hadoopConf: Configuration): String = { text match { - case HADOOP_CONF_PATTERN(matched) => { + case HADOOP_CONF_PATTERN(matched) => logDebug(text + " matched " + HADOOP_CONF_PATTERN) val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. } val eval = Option[String](hadoopConf.get(key)) @@ -330,11 +328,9 @@ class SparkHadoopUtil extends Logging { // Continue to substitute more variables. substituteHadoopVariables(eval.get, hadoopConf) } - } - case _ => { + case _ => logDebug(text + " didn't match " + HADOOP_CONF_PATTERN) text - } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 01901bbf85..9bd3fc1033 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -217,7 +217,7 @@ private[deploy] class Master( } override def receive: PartialFunction[Any, Unit] = { - case ElectedLeader => { + case ElectedLeader => val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE @@ -233,16 +233,14 @@ private[deploy] class Master( } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) } - } case CompleteRecovery => completeRecovery() - case RevokedLeadership => { + case RevokedLeadership => logError("Leadership has been revoked -- master shutting down.") System.exit(0) - } - case RegisterApplication(description, driver) => { + case RegisterApplication(description, driver) => // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response @@ -255,12 +253,11 @@ private[deploy] class Master( driver.send(RegisteredApplication(app.id, self)) schedule() } - } - case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { + case ExecutorStateChanged(appId, execId, state, message, exitStatus) => val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { - case Some(exec) => { + case Some(exec) => val appInfo = idToApp(appId) val oldState = exec.state exec.state = state @@ -298,22 +295,19 @@ private[deploy] class Master( } } } - } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } - } - case DriverStateChanged(driverId, state, exception) => { + case DriverStateChanged(driverId, state, exception) => state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } - } - case Heartbeat(workerId, worker) => { + case Heartbeat(workerId, worker) => idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() @@ -327,9 +321,8 @@ private[deploy] class Master( " This worker was never registered, so ignoring the heartbeat.") } } - } - case MasterChangeAcknowledged(appId) => { + case MasterChangeAcknowledged(appId) => idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) @@ -339,9 +332,8 @@ private[deploy] class Master( } if (canCompleteRecovery) { completeRecovery() } - } - case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { + case WorkerSchedulerStateResponse(workerId, executors, driverIds) => idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) @@ -367,7 +359,6 @@ private[deploy] class Master( } if (canCompleteRecovery) { completeRecovery() } - } case WorkerLatestState(workerId, executors, driverIds) => idToWorker.get(workerId) match { @@ -397,9 +388,8 @@ private[deploy] class Master( logInfo(s"Received unregister request from application $applicationId") idToApp.get(applicationId).foreach(finishApplication) - case CheckForWorkerTimeOut => { + case CheckForWorkerTimeOut => timeOutDeadWorkers() - } case AttachCompletedRebuildUI(appId) => // An asyncRebuildSparkUI has completed, so need to attach to master webUi @@ -408,7 +398,7 @@ private[deploy] class Master( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => { + id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -430,9 +420,8 @@ private[deploy] class Master( + workerAddress)) } } - } - case RequestSubmitDriver(description) => { + case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." @@ -451,9 +440,8 @@ private[deploy] class Master( context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } - } - case RequestKillDriver(driverId) => { + case RequestKillDriver(driverId) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + s"Can only kill drivers in ALIVE state." @@ -484,9 +472,8 @@ private[deploy] class Master( context.reply(KillDriverResponse(self, driverId, success = false, msg)) } } - } - case RequestDriverStatus(driverId) => { + case RequestDriverStatus(driverId) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only request driver status in ALIVE state." @@ -501,18 +488,15 @@ private[deploy] class Master( context.reply(DriverStatusResponse(found = false, None, None, None, None)) } } - } - case RequestMasterState => { + case RequestMasterState => context.reply(MasterStateResponse( address.host, address.port, restServerBoundPort, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)) - } - case BoundPortsRequest => { + case BoundPortsRequest => context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort)) - } case RequestExecutors(appId, requestedTotal) => context.reply(handleRequestExecutors(appId, requestedTotal)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 9cd7458ba0..585e0839d0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -78,7 +78,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) { case ("--help") :: tail => printUsageAndExit(0) - case Nil => {} + case Nil => // No-op case _ => printUsageAndExit(1) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 79f77212fe..af850e4871 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -70,11 +70,10 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer try { Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData))) } catch { - case e: Exception => { + case e: Exception => logWarning("Exception while reading persisted file, deleting", e) zk.delete().forPath(WORKING_DIR + "/" + filename) None - } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala index b97805a28b..11e13441ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -76,14 +76,13 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: case ("--help") :: tail => printUsageAndExit(0) - case Nil => { + case Nil => if (masterUrl == null) { // scalastyle:off println System.err.println("--master is required") // scalastyle:on println printUsageAndExit(1) } - } case _ => printUsageAndExit(1) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index f9c92c3bb9..06066248ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -179,16 +179,14 @@ private[deploy] class ExecutorRunner( val message = "Command exited with code " + exitCode worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) } catch { - case interrupted: InterruptedException => { + case interrupted: InterruptedException => logInfo("Runner thread for executor " + fullId + " interrupted") state = ExecutorState.KILLED killProcess(None) - } - case e: Exception => { + case e: Exception => logError("Error running executor", e) state = ExecutorState.FAILED killProcess(Some(e.toString)) - } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1b7637a39c..449beb0811 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -480,7 +480,7 @@ private[deploy] class Worker( memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { - case e: Exception => { + case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() @@ -488,7 +488,6 @@ private[deploy] class Worker( } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) - } } } @@ -509,7 +508,7 @@ private[deploy] class Worker( } } - case LaunchDriver(driverId, driverDesc) => { + case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, @@ -525,9 +524,8 @@ private[deploy] class Worker( coresUsed += driverDesc.cores memoryUsed += driverDesc.mem - } - case KillDriver(driverId) => { + case KillDriver(driverId) => logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => @@ -535,11 +533,9 @@ private[deploy] class Worker( case None => logError(s"Asked to kill unknown driver $driverId") } - } - case driverStateChanged @ DriverStateChanged(driverId, state, exception) => { + case driverStateChanged @ DriverStateChanged(driverId, state, exception) => handleDriverStateChanged(driverStateChanged) - } case ReregisterWithMaster => reregisterWithMaster() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 391eb41190..777020d4d5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -165,12 +165,11 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { } // scalastyle:on classforname } catch { - case e: Exception => { + case e: Exception => totalMb = 2*1024 // scalastyle:off println System.out.println("Failed to get total physical memory. Using " + totalMb + " MB") // scalastyle:on println - } } // Leave out 1 GB for the operating system, but don't return a negative memory size math.max(totalMb - 1024, Utils.DEFAULT_DRIVER_MEM_MB) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d4ed5845e7..71b4ad160d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -62,10 +62,9 @@ private[spark] class CoarseGrainedExecutorBackend( // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it - case Failure(e) => { + case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e) System.exit(1) - } }(ThreadUtils.sameThread) } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 4da1017d28..0fed991049 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -196,10 +196,9 @@ private[spark] class MetricsSystem private ( sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => { + case e: Exception => logError("Sink class " + classPath + " cannot be instantiated") throw e - } } } } diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala index c562c70aba..ab6aba6fc7 100644 --- a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala @@ -32,12 +32,11 @@ class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, v */ override def equals(that: Any): Boolean = that match { - case that: BoundedDouble => { + case that: BoundedDouble => this.mean == that.mean && this.confidence == that.confidence && this.low == that.low && this.high == that.high - } case _ => false } } diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 5e9230e733..368916a39e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -166,8 +166,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { val counters = new Array[Long](buckets.length - 1) while (iter.hasNext) { bucketFunction(iter.next()) match { - case Some(x: Int) => {counters(x) += 1} - case _ => {} + case Some(x: Int) => counters(x) += 1 + case _ => // No-Op } } Iterator(counters) diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 363004e587..a5992022d0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -86,12 +86,11 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper) val rddToFilter: RDD[P] = self.partitioner match { - case Some(rp: RangePartitioner[K, V]) => { + case Some(rp: RangePartitioner[K, V]) => val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match { case (l, u) => Math.min(l, u) to Math.max(l, u) } PartitionPruningRDD.create(self, partitionIndicies.contains) - } case _ => self } diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 582fa93afe..462fb39ea2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -128,7 +128,7 @@ private object ParallelCollectionRDD { }) } seq match { - case r: Range => { + case r: Range => positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { @@ -138,8 +138,7 @@ private object ParallelCollectionRDD { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } }).toSeq.asInstanceOf[Seq[Seq[T]]] - } - case nr: NumericRange[_] => { + case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr @@ -149,14 +148,12 @@ private object ParallelCollectionRDD { r = r.drop(sliceSize) } slices - } - case _ => { + case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map({ case (start, end) => array.slice(start, end).toSeq }).toSeq - } } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 9e3880714a..c3579d761d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -78,11 +78,10 @@ class PartitionerAwareUnionRDD[T: ClassTag]( logDebug("Finding preferred location for " + this + ", partition " + s.index) val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents val locations = rdds.zip(parentPartitions).flatMap { - case (rdd, part) => { + case (rdd, part) => val parentLocations = currPrefLocs(rdd, part) logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations) parentLocations - } } val location = if (locations.isEmpty) { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 0640f26051..a6b032cc00 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -57,11 +57,10 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // Since we are not doing canonicalization of path, this can be wrong : like relative vs // absolute path .. which is fine, this is best case effort to remove duplicates - right ? override def equals(other: Any): Boolean = other match { - case that: InputFormatInfo => { + case that: InputFormatInfo => // not checking config - that should be fine, right ? this.inputFormatClazz == that.inputFormatClazz && this.path == that.path - } case _ => false } @@ -86,10 +85,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl } } catch { - case e: ClassNotFoundException => { + case e: ClassNotFoundException => throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e) - } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index 6e9337bb90..bc1431835e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -49,14 +49,13 @@ class SplitInfo( // So unless there is identity equality between underlyingSplits, it will always fail even if it // is pointing to same block. override def equals(other: Any): Boolean = other match { - case that: SplitInfo => { + case that: SplitInfo => this.hostLocation == that.hostLocation && this.inputFormatClazz == that.inputFormatClazz && this.path == that.path && this.length == that.length && // other split specific checks (like start for FileSplit) this.underlyingSplit == that.underlyingSplit - } case _ => false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 873f1b56bd..ae7ef46abb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -133,7 +133,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul // if we can't deserialize the reason. logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) - case ex: Exception => {} + case ex: Exception => // No-op } scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 15d3515a02..6e08cdd87a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -188,20 +188,18 @@ private[spark] class TaskSetManager( loc match { case e: ExecutorCacheTaskLocation => pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index - case e: HDFSCacheTaskLocation => { + case e: HDFSCacheTaskLocation => val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { - case Some(set) => { + case Some(set) => for (e <- set) { pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) - } case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } - } case _ => } pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index @@ -437,7 +435,7 @@ private[spark] class TaskSetManager( } dequeueTask(execId, host, allowedLocality) match { - case Some((index, taskLocality, speculative)) => { + case Some((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -486,7 +484,6 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) - } case _ => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala index 3971e6c382..61ab3e87c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala @@ -121,11 +121,10 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine( Some(Utils.deserialize[T](fileData)) } catch { case e: NoNodeException => None - case e: Exception => { + case e: Exception => logWarning("Exception while reading persisted file, deleting", e) zk.delete().forPath(zkPath) None - } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 374c79a7e5..1b7ac172de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -55,11 +55,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { Some(vol.setContainerPath(container_path) .setHostPath(host_path) .setMode(Volume.Mode.RO)) - case spec => { + case spec => logWarning(s"Unable to parse volume specs: $volumes. " + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"") None - } } } .map { _.build() } @@ -90,11 +89,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { Some(portmap.setHostPort(host_port.toInt) .setContainerPort(container_port.toInt) .setProtocol(protocol)) - case spec => { + case spec => logWarning(s"Unable to parse port mapping specs: $portmaps. " + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"") None - } } } .map { _.build() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 233bdc23e6..7295d50682 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -124,11 +124,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging { markErr() } } catch { - case e: Exception => { + case e: Exception => logError("driver.run() failed", e) error = Some(e) markErr() - } } } }.start() @@ -184,7 +183,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { - case r => { + case r => if (remain > 0 && r.getType == Value.Type.SCALAR && r.getScalar.getValue > 0.0 && @@ -196,7 +195,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } else { r } - } } // Filter any resource that has depleted. diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 5ead40e89e..cb95246d5b 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -188,10 +188,9 @@ abstract class DeserializationStream { try { (readKey[Any](), readValue[Any]()) } catch { - case eof: EOFException => { + case eof: EOFException => finished = true null - } } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 25edb9f1e4..4ec5b4bbb0 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -143,13 +143,12 @@ final class ShuffleBlockFetcherIterator( while (iter.hasNext) { val result = iter.next() result match { - case SuccessFetchResult(_, address, _, buf, _) => { + case SuccessFetchResult(_, address, _, buf, _) => if (address != blockManager.blockManagerId) { shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) } buf.release() - } case _ => } } @@ -313,7 +312,7 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => { + case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => if (address != blockManager.blockManagerId) { shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) @@ -323,7 +322,6 @@ final class ShuffleBlockFetcherIterator( reqsInFlight -= 1 logDebug("Number of requests in flight " + reqsInFlight) } - } case _ => } // Send fetch requests up to maxBytesInFlight diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala index cc476d61b5..a0ef80d9bd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala @@ -38,7 +38,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.sortWith { - case (threadTrace1, threadTrace2) => { + case (threadTrace1, threadTrace2) => val v1 = if (threadTrace1.threadName.contains("Executor task launch")) 1 else 0 val v2 = if (threadTrace2.threadName.contains("Executor task launch")) 1 else 0 if (v1 == v2) { @@ -46,7 +46,6 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage } else { v1 > v2 } - } }.map { thread => val threadId = thread.threadId <tr id={s"thread_${threadId}_tr"} class="accordion-heading" diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 153025cef2..3ea9139e11 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -47,13 +47,12 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { try { onReceive(event) } catch { - case NonFatal(e) => { + case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } - } } } } catch { diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 3f627a0145..6861a75612 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -151,13 +151,12 @@ object SizeEstimator extends Logging { // TODO: We could use reflection on the VMOption returned ? getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") } catch { - case e: Exception => { + case e: Exception => // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) val guessInWords = if (guess) "yes" else "not" logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords) return guess - } } } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 67d722c1dc..2110d3d770 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -320,7 +320,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex Thread.sleep(200) } } catch { - case _: Throwable => { Thread.sleep(10) } + case _: Throwable => Thread.sleep(10) // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 3706455c3f..8feb3dee05 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -82,20 +82,18 @@ package object testPackage extends Assertions { val curCallSite = sc.getCallSite().shortForm // note: 2 lines after definition of "rdd" val rddCreationLine = rddCreationSite match { - case CALL_SITE_REGEX(func, file, line) => { + case CALL_SITE_REGEX(func, file, line) => assert(func === "makeRDD") assert(file === "SparkContextInfoSuite.scala") line.toInt - } case _ => fail("Did not match expected call site format") } curCallSite match { - case CALL_SITE_REGEX(func, file, line) => { + case CALL_SITE_REGEX(func, file, line) => assert(func === "getCallSite") // this is correct because we called it from outside of Spark assert(file === "SparkContextInfoSuite.scala") assert(line.toInt === rddCreationLine.toInt + 2) - } case _ => fail("Did not match expected call site format") } } diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala index f7a13ab399..09e21646ee 100644 --- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala +++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala @@ -35,7 +35,7 @@ class UnpersistSuite extends SparkFunSuite with LocalSparkContext { Thread.sleep(200) } } catch { - case _: Throwable => { Thread.sleep(10) } + case _: Throwable => Thread.sleep(10) // Do nothing. We might see exceptions because block manager // is racing this thread to remove entries from the driver. } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 43e61241b6..cebac2097f 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -127,9 +127,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { override val rpcEnv = env override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case msg: String => { + case msg: String => context.reply(msg) - } } }) val reply = rpcEndpointRef.askWithRetry[String]("hello") @@ -141,9 +140,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { override val rpcEnv = env override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case msg: String => { + case msg: String => context.reply(msg) - } } }) @@ -164,10 +162,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { override val rpcEnv = env override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case msg: String => { + case msg: String => Thread.sleep(100) context.reply(msg) - } } }) @@ -317,10 +314,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { override val rpcEnv = env override def receive: PartialFunction[Any, Unit] = { - case m => { + case m => self callSelfSuccessfully = true - } } }) @@ -682,9 +678,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { override val rpcEnv = localEnv override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case msg: String => { + case msg: String => context.reply(msg) - } } }) val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication") |