aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-27 14:26:28 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-27 14:26:28 +0530
commit54862af5ee813030ead80ec097f48620ddb974fc (patch)
treef6d2c86ab7611ae6407bbc4386d82a89f27b4881 /core
parentdca946ff6779cda9c50ae0069734b7802bb4d24a (diff)
downloadspark-54862af5ee813030ead80ec097f48620ddb974fc.tar.gz
spark-54862af5ee813030ead80ec097f48620ddb974fc.tar.bz2
spark-54862af5ee813030ead80ec097f48620ddb974fc.zip
Improvements from the review comments and followed Boy Scout Rule.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala5
8 files changed, 21 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a686b534da..88a7f24884 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,15 +21,11 @@ import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
-import scala.concurrent.Await
import akka.pattern.ask
-import akka.remote._
-
-import scala.concurrent.duration.Duration
-import akka.util.Timeout
-import scala.concurrent.duration._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0e2b461b13..c627dd3806 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -39,9 +39,6 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -159,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}
- case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
+ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
@@ -167,7 +164,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+ sender, workerWebUiPort, publicAddress)
registerWorker(worker)
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0a183afd8e..808b54c0af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,7 +27,7 @@ import scala.concurrent.duration._
import akka.actor._
import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -73,6 +73,7 @@ private[spark] class Worker(
val masterLock: Object = new Object()
var master: ActorSelection = null
+ var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
@volatile var registered = false
@@ -136,6 +137,10 @@ private[spark] class Worker(
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(_host, _port) => Address("akka.tcp", Master.systemName, _host, _port.toInt)
+ case x => throw new SparkException("Invalid spark URL:"+x)
+ }
connected = true
}
}
@@ -240,7 +245,7 @@ private[spark] class Worker(
}
}
- case x: DisassociatedEvent =>
+ case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
logInfo(s"$x Disassociated !")
masterDisconnected()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fdea3f6f88..773e9ec182 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,7 +22,6 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.reflect.ClassTag
@@ -115,6 +114,7 @@ class DAGScheduler(
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
override def preStart() {
+ import context.dispatcher
context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) {
if (failed.size > 0) {
resubmitFailedStages()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 7c9d6a93e4..8056cb2597 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -24,8 +24,6 @@ import java.util.{TimerTask, Timer}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import org.apache.spark._
@@ -123,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (System.getProperty("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
-
+ import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
checkSpeculatableTasks()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d614dcbdd8..f5e8766f6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
@@ -52,7 +52,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
- private val actorToExecutorId = new HashMap[ActorRef, String]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
@@ -77,7 +76,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
- actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
@@ -147,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
- actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
executorHost -= executorId
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index a4aa316e4b..e5de16fc01 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,24 +17,15 @@
package org.apache.spark.storage
-import akka.actor._
-import scala.concurrent.Await
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
+import akka.actor._
import akka.pattern.ask
-import scala.concurrent.duration._
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.storage.BlockManagerMessages.GetLocations
-import org.apache.spark.storage.BlockManagerMessages.GetLocationsMultipleBlockIds
-import org.apache.spark.storage.BlockManagerMessages.RegisterBlockManager
-import org.apache.spark.storage.BlockManagerMessages.HeartBeat
-import org.apache.spark.storage.BlockManagerMessages.RemoveExecutor
-import org.apache.spark.storage.BlockManagerMessages.GetPeers
-import org.apache.spark.storage.BlockManagerMessages.RemoveBlock
-import org.apache.spark.storage.BlockManagerMessages.RemoveRdd
private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 23e9b735f3..3444d8fdfe 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,8 +44,9 @@ private[spark] object AkkaUtils {
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
- val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
+ val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "60").toInt
+ val akkaFailureDetector =
+ System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt
val akkaConf = ConfigFactory.parseString(