aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLian, Cheng <rhythm.mail@gmail.com>2013-11-11 12:21:54 +0800
committerLian, Cheng <rhythm.mail@gmail.com>2013-11-11 12:21:54 +0800
commite2a43b3dcce81fc99098510d09095e1be4bf3e29 (patch)
treed4f1d828a94c41d528c7e8ae0384b905398d90b7
parentba552851771cf8eaf90b72b661c3df60080d0ef9 (diff)
downloadspark-e2a43b3dcce81fc99098510d09095e1be4bf3e29.tar.gz
spark-e2a43b3dcce81fc99098510d09095e1be4bf3e29.tar.bz2
spark-e2a43b3dcce81fc99098510d09095e1be4bf3e29.zip
Made some changes according to suggestions from @aarondav
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
2 files changed, 6 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 74995706a8..42bb3884c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -112,9 +112,10 @@ class DAGScheduler(
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
override def preStart() {
- env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
- if (failed.size > 0)
+ context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+ if (failed.size > 0) {
resubmitFailedStages()
+ }
}
}
@@ -853,7 +854,7 @@ class DAGScheduler(
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
- rdd.dependencies.foreach(_ match {
+ rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
@@ -861,7 +862,7 @@ class DAGScheduler(
return locs
}
case _ =>
- })
+ }
Nil
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2c21134393..702aca8323 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -891,7 +891,7 @@ private[spark] object BlockManager extends Logging {
blockManagerMaster: BlockManagerMaster = null)
: Map[BlockId, Seq[BlockManagerId]] =
{
- // env == null and blockManagerMaster != null is used in tests
+ // blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
env.blockManager.getLocationBlockIds(blockIds)