aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 03:05:52 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 03:05:52 -0800
commit3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (patch)
tree98b744beacdb06925eba8413288c473b13f49174 /core/src/main/scala/org/apache/spark/deploy
parentd0fd3b9ad238294346eb3465c489eabd41fb2380 (diff)
parenta2e7e0497484554f86bd71e93705eb0422b1512b (diff)
downloadspark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.tar.gz
spark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.tar.bz2
spark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.zip
Merge remote-tracking branch 'apache/master' into project-refactor
Conflicts: examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala6
13 files changed, 97 insertions, 77 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0aa8852649..4dfb19ed8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
if (sc != null) { sc.stop() }
- // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+ // Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
@@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
"docker ps -l -q".!(ProcessLogger(line => id = line))
new DockerId(id)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 59d12a3e6f..ffc0cb0903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.Utils
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.collection.mutable.ArrayBuffer
@@ -43,7 +43,8 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
- val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
+ val conf = new SparkConf(false)
+ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masters = Array(masterUrl)
@@ -55,7 +56,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
workerActorSystems += workerSystem
}
- return masters
+ masters
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fc1537f796..27dc42bf7e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,10 +34,10 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
- // if we are already running as the user intended there is no reason to do the doAs. It
+ // if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
- // the user is UNKNOWN then we shouldn't be creating a remote unknown user
- // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
+ // the user is UNKNOWN then we shouldn't be creating a remote unknown user
+ // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
@@ -67,11 +67,15 @@ class SparkHadoopUtil {
}
object SparkHadoopUtil {
+
private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
- Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+ .newInstance()
+ .asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 953755e40d..481026eaa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,20 +19,19 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
-import scala.concurrent.duration._
import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
-
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -43,7 +42,8 @@ private[spark] class Client(
actorSystem: ActorSystem,
masterUrls: Array[String],
appDescription: ApplicationDescription,
- listener: ClientListener)
+ listener: ClientListener,
+ conf: SparkConf)
extends Logging {
val REGISTRATION_TIMEOUT = 20.seconds
@@ -111,6 +111,12 @@ private[spark] class Client(
}
}
+ private def isPossibleMaster(remoteUrl: Address) = {
+ masterUrls.map(s => Master.toAkkaUrl(s))
+ .map(u => AddressFromURIString(u).hostPort)
+ .contains(remoteUrl.hostPort)
+ }
+
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@@ -146,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
+ case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+ logWarning(s"Could not connect to $address: $cause")
+
case StopClient =>
markDead()
sender ! true
@@ -178,7 +187,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 5b62d3ba6c..ef649fd80c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.client
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
@@ -45,11 +45,13 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ conf = new SparkConf)
val desc = new ApplicationDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+ "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+ "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, Array(url), desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd0794b8..7b696cfcca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -38,14 +38,16 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
- import context.dispatcher
+ import context.dispatcher // to use Akka's scheduler.schedule()
+
+ val conf = new SparkConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -63,8 +65,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +105,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +115,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
@@ -495,7 +497,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
- workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
@@ -507,8 +509,9 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
- val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+ val conf = new SparkConf
+ val args = new MasterArguments(argStrings, conf)
+ val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
@@ -522,10 +525,12 @@ private[spark] object Master {
}
}
- def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
+ : (ActorSystem, Int, Int) =
+ {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9d89b455fb..e7f3224091 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -18,16 +18,17 @@
package org.apache.spark.deploy.master
import org.apache.spark.util.{Utils, IntParam}
+import org.apache.spark.SparkConf
/**
* Command-line parser for the master.
*/
-private[spark] class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
-
- // Check for settings in environment variables
+
+ // Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
@@ -37,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String]) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (System.getProperty("master.ui.port") != null) {
- webUiPort = System.getProperty("master.ui.port").toInt
+ if (conf.contains("master.ui.port")) {
+ webUiPort = conf.get("master.ui.port").toInt
}
parse(args.toList)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 6cc7fd2ff4..999090ad74 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -35,8 +35,9 @@ import org.apache.spark.Logging
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g., "node already exists").
*/
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
- val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
+ conf: SparkConf) extends Logging {
+ val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7d535b08de..77c23fb9fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,16 +21,17 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.master.MasterMessages._
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
+ masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this)
+ private val zk = new SparkZooKeeperSession(this, conf)
private var status = LeadershipStatus.NOT_LEADER
private var myLeaderFile: String = _
private var leaderUrl: String = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b3bb..52000d4f9c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,19 +17,19 @@
package org.apache.spark.deploy.master
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.zookeeper._
import akka.serialization.Serialization
-class ZooKeeperPersistenceEngine(serialization: Serialization)
+class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
- val zk = new SparkZooKeeperSession(this)
+ val zk = new SparkZooKeeperSession(this, conf)
zk.connect()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 9ab594b682..ead35662fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(master.conf)
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6719..fcaf4e92b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,23 +25,14 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -53,7 +44,8 @@ private[spark] class Worker(
cores: Int,
memory: Int,
masterUrls: Array[String],
- workDirPath: String = null)
+ workDirPath: String = null,
+ val conf: SparkConf)
extends Actor with Logging {
import context.dispatcher
@@ -63,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -92,7 +84,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
- val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed
@@ -275,6 +267,7 @@ private[spark] class Worker(
}
private[spark] object Worker {
+
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -283,13 +276,16 @@ private[spark] object Worker {
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
- masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
- : (ActorSystem, Int) = {
+ masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+ : (ActorSystem, Int) =
+ {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+ val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = conf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir), name = "Worker")
+ masterUrls, workDir, conf), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 19aa800a95..c382034c99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,7 +22,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
@@ -34,10 +34,10 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
- System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None