diff options
Diffstat (limited to 'core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala')
-rw-r--r-- | core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 1047f71c6a..ebe2ac68d8 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -12,23 +12,27 @@ import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.LaunchTask import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor +import spark.Utils +import spark.deploy.SparkHadoopUtil private[spark] class StandaloneExecutorBackend( driverUrl: String, executorId: String, - hostname: String, + hostPort: String, cores: Int) extends Actor with ExecutorBackend with Logging { + Utils.checkHostPort(hostPort, "Expected hostport") + var executor: Executor = null var driver: ActorRef = null override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorFor(driverUrl) - driver ! RegisterExecutor(executorId, hostname, cores) + driver ! RegisterExecutor(executorId, hostPort, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(driver) // Doesn't work with remote actors, but useful for testing } @@ -36,7 +40,8 @@ private[spark] class StandaloneExecutorBackend( override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") - executor = new Executor(executorId, hostname, sparkProperties) + // Make this host instead of hostPort ? + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -63,11 +68,30 @@ private[spark] class StandaloneExecutorBackend( private[spark] object StandaloneExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { + SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) + } + + // This will be run 'as' the user + def run0(args: Product) { + assert(4 == args.productArity) + runImpl(args.productElement(0).asInstanceOf[String], + args.productElement(1).asInstanceOf[String], + args.productElement(2).asInstanceOf[String], + args.productElement(3).asInstanceOf[Int]) + } + + private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) { + // Debug code + Utils.checkHost(hostname) + // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + // set it + val sparkHostPort = hostname + ":" + boundPort + System.setProperty("spark.hostPort", sparkHostPort) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)), + Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), name = "Executor") actorSystem.awaitTermination() } |