From bcdde331c3ed68af27bc5d6067c78f68dbd6b032 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 17 Apr 2013 04:12:18 +0530 Subject: Move from master to driver --- .../scala/spark/deploy/yarn/ApplicationMaster.scala | 20 ++++++++++---------- .../spark/deploy/yarn/YarnAllocationHandler.scala | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) (limited to 'core') diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index 65361e0ed9..ae719267e8 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -76,7 +76,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Start the user's JAR userThread = startUserClass() - // This a bit hacky, but we need to wait until the spark.master.port property has + // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. waitForSparkMaster() @@ -124,19 +124,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } private def waitForSparkMaster() { - logInfo("Waiting for spark master to be reachable.") - var masterUp = false - while(!masterUp) { - val masterHost = System.getProperty("spark.master.host") - val masterPort = System.getProperty("spark.master.port") + logInfo("Waiting for spark driver to be reachable.") + var driverUp = false + while(!driverUp) { + val driverHost = System.getProperty("spark.driver.host") + val driverPort = System.getProperty("spark.driver.port") try { - val socket = new Socket(masterHost, masterPort.toInt) + val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + masterHost + ":" + masterPort) - masterUp = true + logInfo("Master now available: " + driverHost + ":" + driverPort) + driverUp = true } catch { case e: Exception => - logError("Failed to connect to master at " + masterHost + ":" + masterPort) + logError("Failed to connect to driver at " + driverHost + ":" + driverPort) Thread.sleep(100) } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala index cac9dab401..61dd72a651 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala @@ -191,8 +191,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString - val masterUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) @@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM } new Thread( - new WorkerRunnable(container, conf, masterUrl, workerId, + new WorkerRunnable(container, conf, driverUrl, workerId, workerHostname, workerMemory, workerCores) ).start() } -- cgit v1.2.3