aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/master/Master.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala41
1 files changed, 23 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd0794b8..7b696cfcca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -38,14 +38,16 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
- import context.dispatcher
+ import context.dispatcher // to use Akka's scheduler.schedule()
+
+ val conf = new SparkConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -63,8 +65,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +105,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +115,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
@@ -495,7 +497,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
- workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
@@ -507,8 +509,9 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
- val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val conf = new SparkConf
+ val args = new MasterArguments(argStrings, conf)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
@@ -522,10 +525,12 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
+ : (ActorSystem, Int, Int) =
+ {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)