diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-27 12:19:38 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-27 12:19:38 -0800 |
commit | 0c71ffe924a158608b1760477b883e4818d53af4 (patch) | |
tree | dd6aecb255473e9a3dae9d70f75e780263ef5225 | |
parent | 8c81068e16d4485e7f35dfaf99de6ee99fd76678 (diff) | |
download | spark-0c71ffe924a158608b1760477b883e4818d53af4.tar.gz spark-0c71ffe924a158608b1760477b883e4818d53af4.tar.bz2 spark-0c71ffe924a158608b1760477b883e4818d53af4.zip |
Style fixes as per Reynold's review
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2a131fde28..c48a3d64ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -361,8 +361,8 @@ class DAGScheduler( stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id val parents = getParentStages(s.rdd, jobId) - val parentsWithoutThisJobId = parents.filter( - p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) + val parentsWithoutThisJobId = parents.filter(p => + !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) } } @@ -395,8 +395,8 @@ class DAGScheduler( running -= s } stageToInfos -= s - shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach( - shuffleToMapStage.remove) + shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId => + shuffleToMapStage.remove(shuffleId)) if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { logDebug("Removing pending status for stage %d".format(stageId)) } @@ -573,8 +573,8 @@ class DAGScheduler( case JobGroupCancelled(groupId) => // Cancel all jobs belonging to this job group. // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = activeJobs.filter( - groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + val activeInGroup = activeJobs.filter(activeJob => + groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach { handleJobCancellation } |