aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-12-27 12:19:38 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-12-27 12:19:38 -0800
commit0c71ffe924a158608b1760477b883e4818d53af4 (patch)
treedd6aecb255473e9a3dae9d70f75e780263ef5225 /core
parent8c81068e16d4485e7f35dfaf99de6ee99fd76678 (diff)
downloadspark-0c71ffe924a158608b1760477b883e4818d53af4.tar.gz
spark-0c71ffe924a158608b1760477b883e4818d53af4.tar.bz2
spark-0c71ffe924a158608b1760477b883e4818d53af4.zip
Style fixes as per Reynold's review
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
1 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2a131fde28..c48a3d64ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -361,8 +361,8 @@ class DAGScheduler(
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
val parents = getParentStages(s.rdd, jobId)
- val parentsWithoutThisJobId = parents.filter(
- p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
+ val parentsWithoutThisJobId = parents.filter(p =>
+ !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
}
}
@@ -395,8 +395,8 @@ class DAGScheduler(
running -= s
}
stageToInfos -= s
- shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(
- shuffleToMapStage.remove)
+ shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
+ shuffleToMapStage.remove(shuffleId))
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId))
}
@@ -573,8 +573,8 @@ class DAGScheduler(
case JobGroupCancelled(groupId) =>
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
- val activeInGroup = activeJobs.filter(
- groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+ val activeInGroup = activeJobs.filter(activeJob =>
+ groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach { handleJobCancellation }