diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-17 04:12:18 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-17 04:12:18 +0530 |
commit | bcdde331c3ed68af27bc5d6067c78f68dbd6b032 (patch) | |
tree | c46b78fd66ce57607c14505da493fa0eea981f0b /core | |
parent | ad80f68eb5d153d7f666447966755efce186d021 (diff) | |
download | spark-bcdde331c3ed68af27bc5d6067c78f68dbd6b032.tar.gz spark-bcdde331c3ed68af27bc5d6067c78f68dbd6b032.tar.bz2 spark-bcdde331c3ed68af27bc5d6067c78f68dbd6b032.zip |
Move from master to driver
Diffstat (limited to 'core')
-rw-r--r-- | core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala | 20 | ||||
-rw-r--r-- | core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala | 6 |
2 files changed, 13 insertions, 13 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index 65361e0ed9..ae719267e8 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -76,7 +76,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Start the user's JAR userThread = startUserClass() - // This a bit hacky, but we need to wait until the spark.master.port property has + // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. waitForSparkMaster() @@ -124,19 +124,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } private def waitForSparkMaster() { - logInfo("Waiting for spark master to be reachable.") - var masterUp = false - while(!masterUp) { - val masterHost = System.getProperty("spark.master.host") - val masterPort = System.getProperty("spark.master.port") + logInfo("Waiting for spark driver to be reachable.") + var driverUp = false + while(!driverUp) { + val driverHost = System.getProperty("spark.driver.host") + val driverPort = System.getProperty("spark.driver.port") try { - val socket = new Socket(masterHost, masterPort.toInt) + val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + masterHost + ":" + masterPort) - masterUp = true + logInfo("Master now available: " + driverHost + ":" + driverPort) + driverUp = true } catch { case e: Exception => - logError("Failed to connect to master at " + masterHost + ":" + masterPort) + logError("Failed to connect to driver at " + driverHost + ":" + driverPort) Thread.sleep(100) } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala index cac9dab401..61dd72a651 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala @@ -191,8 +191,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString - val masterUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) @@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM } new Thread( - new WorkerRunnable(container, conf, masterUrl, workerId, + new WorkerRunnable(container, conf, driverUrl, workerId, workerHostname, workerMemory, workerCores) ).start() } |