From d0533f704681adccc8fe2b814dc9e5082646057a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 7 Jan 2014 23:38:46 -0800 Subject: Rename to Client --- .../scala/org/apache/spark/deploy/Client.scala | 114 ++++++++++++++++++++ .../org/apache/spark/deploy/ClientArguments.scala | 105 ++++++++++++++++++ .../apache/spark/deploy/client/DriverClient.scala | 117 --------------------- .../deploy/client/DriverClientArguments.scala | 105 ------------------ 4 files changed, 219 insertions(+), 222 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/Client.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala new file mode 100644 index 0000000000..0475bb17c0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import scala.collection.JavaConversions._ +import scala.collection.mutable.Map +import scala.concurrent._ + +import akka.actor._ +import org.apache.log4j.{Level, Logger} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.master.Master +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * Actor that sends a single message to the standalone master and returns the response in the + * given promise. + */ +class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging { + override def receive = { + case SubmitDriverResponse(success, message) => { + response.success((success, message)) + } + + case KillDriverResponse(success, message) => { + response.success((success, message)) + } + + // Relay all other messages to the master. + case message => { + logInfo(s"Sending message to master $master...") + val masterActor = context.actorSelection(Master.toAkkaUrl(master)) + masterActor ! message + } + } +} + +/** + * Executable utility for starting and terminating drivers inside of a standalone cluster. + */ +object DriverClient { + + def main(args: Array[String]) { + val driverArgs = new ClientArguments(args) + val conf = new SparkConf() + + if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { + conf.set("spark.akka.logLifecycleEvents", "true") + } + conf.set("spark.akka.askTimeout", "5") + Logger.getRootLogger.setLevel(driverArgs.logLevel) + + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. + val (actorSystem, _) = AkkaUtils.createActorSystem( + "driverClient", Utils.localHostName(), 0, false, conf) + val master = driverArgs.master + val response = promise[(Boolean, String)] + val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) + + println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") + driverArgs.cmd match { + case "launch" => + // TODO: We could add an env variable here and intercept it in `sc.addJar` that would + // truncate filesystem paths similar to what YARN does. For now, we just require + // people call `addJar` assuming the jar is in the same directory. + val env = Map[String, String]() + System.getenv().foreach{case (k, v) => env(k) = v} + + val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" + val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ + driverArgs.driverOptions, env) + + val driverDescription = new DriverDescription( + driverArgs.jarUrl, + driverArgs.memory, + driverArgs.cores, + driverArgs.supervise, + command) + driver ! RequestSubmitDriver(driverDescription) + + case "kill" => + val driverId = driverArgs.driverId + driver ! RequestKillDriver(driverId) + } + + val (success, message) = + try { + Await.result(response.future, AkkaUtils.askTimeout(conf)) + } catch { + case e: TimeoutException => (false, s"Error: Timed out sending message to $master") + } + println(message) + actorSystem.shutdown() + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala new file mode 100644 index 0000000000..50b92e1eab --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import scala.collection.mutable.ListBuffer + +import org.apache.log4j.Level + +/** + * Command-line parser for the driver client. + */ +private[spark] class ClientArguments(args: Array[String]) { + val defaultCores = 1 + val defaultMemory = 512 + + var cmd: String = "" // 'launch' or 'kill' + var logLevel = Level.WARN + + // launch parameters + var master: String = "" + var jarUrl: String = "" + var mainClass: String = "" + var supervise: Boolean = false + var memory: Int = defaultMemory + var cores: Int = defaultCores + private var _driverOptions = ListBuffer[String]() + def driverOptions = _driverOptions.toSeq + + // kill parameters + var driverId: String = "" + + parse(args.toList) + + def parse(args: List[String]): Unit = args match { + case ("--cores" | "-c") :: value :: tail => + cores = value.toInt + parse(tail) + + case ("--memory" | "-m") :: value :: tail => + memory = value.toInt + parse(tail) + + case ("--supervise" | "-s") :: tail => + supervise = true + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case ("--verbose" | "-v") :: tail => + logLevel = Level.INFO + parse(tail) + + case "launch" :: _master :: _jarUrl :: _mainClass :: tail => + cmd = "launch" + master = _master + jarUrl = _jarUrl + mainClass = _mainClass + _driverOptions ++= tail + + case "kill" :: _master :: _driverId :: tail => + cmd = "kill" + master = _master + driverId = _driverId + + case _ => + printUsageAndExit(1) + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars + // separately similar to in the YARN client. + val usage = + s""" + |Usage: DriverClient [options] launch [driver options] + |Usage: DriverClient kill + | + |Options: + | -c CORES, --cores CORES Number of cores to request (default: $defaultCores) + | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory) + | -s, --supervise Whether to restart the driver on failure + | -v, --verbose Print more debugging output + """.stripMargin + System.err.println(usage) + System.exit(exitCode) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala deleted file mode 100644 index 8b066ba1a5..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.client - -import scala.collection.JavaConversions._ -import scala.collection.mutable.Map -import scala.concurrent._ - -import akka.actor._ -import akka.actor.Actor - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.{Command, DriverDescription} -import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.Master -import org.apache.spark.util.{AkkaUtils, Utils} -import org.apache.log4j.{Logger, Level} -import akka.remote.RemotingLifecycleEvent - -/** - * Actor that sends a single message to the standalone master and returns the response in the - * given promise. - */ -class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging { - override def receive = { - case SubmitDriverResponse(success, message) => { - response.success((success, message)) - } - - case KillDriverResponse(success, message) => { - response.success((success, message)) - } - - // Relay all other messages to the master. - case message => { - logInfo(s"Sending message to master $master...") - val masterActor = context.actorSelection(Master.toAkkaUrl(master)) - masterActor ! message - } - } -} - -/** - * Executable utility for starting and terminating drivers inside of a standalone cluster. - */ -object DriverClient { - - def main(args: Array[String]) { - val driverArgs = new DriverClientArguments(args) - val conf = new SparkConf() - - if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { - conf.set("spark.akka.logLifecycleEvents", "true") - } - conf.set("spark.akka.askTimeout", "5") - Logger.getRootLogger.setLevel(driverArgs.logLevel) - - // TODO: See if we can initialize akka so return messages are sent back using the same TCP - // flow. Else, this (sadly) requires the DriverClient be routable from the Master. - val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, false, conf) - val master = driverArgs.master - val response = promise[(Boolean, String)] - val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) - - println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") - driverArgs.cmd match { - case "launch" => - // TODO: We could add an env variable here and intercept it in `sc.addJar` that would - // truncate filesystem paths similar to what YARN does. For now, we just require - // people call `addJar` assuming the jar is in the same directory. - val env = Map[String, String]() - System.getenv().foreach{case (k, v) => env(k) = v} - - val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" - val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ - driverArgs.driverOptions, env) - - val driverDescription = new DriverDescription( - driverArgs.jarUrl, - driverArgs.memory, - driverArgs.cores, - driverArgs.supervise, - command) - driver ! RequestSubmitDriver(driverDescription) - - case "kill" => - val driverId = driverArgs.driverId - driver ! RequestKillDriver(driverId) - } - - val (success, message) = - try { - Await.result(response.future, AkkaUtils.askTimeout(conf)) - } catch { - case e: TimeoutException => (false, s"Error: Timed out sending message to $master") - } - println(message) - actorSystem.shutdown() - actorSystem.awaitTermination() - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala deleted file mode 100644 index 7774a5615c..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.client - -import scala.collection.mutable.ListBuffer - -import org.apache.log4j.Level - -/** - * Command-line parser for the driver client. - */ -private[spark] class DriverClientArguments(args: Array[String]) { - val defaultCores = 1 - val defaultMemory = 512 - - var cmd: String = "" // 'launch' or 'kill' - var logLevel = Level.WARN - - // launch parameters - var master: String = "" - var jarUrl: String = "" - var mainClass: String = "" - var supervise: Boolean = false - var memory: Int = defaultMemory - var cores: Int = defaultCores - private var _driverOptions = ListBuffer[String]() - def driverOptions = _driverOptions.toSeq - - // kill parameters - var driverId: String = "" - - parse(args.toList) - - def parse(args: List[String]): Unit = args match { - case ("--cores" | "-c") :: value :: tail => - cores = value.toInt - parse(tail) - - case ("--memory" | "-m") :: value :: tail => - memory = value.toInt - parse(tail) - - case ("--supervise" | "-s") :: tail => - supervise = true - parse(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case ("--verbose" | "-v") :: tail => - logLevel = Level.INFO - parse(tail) - - case "launch" :: _master :: _jarUrl :: _mainClass :: tail => - cmd = "launch" - master = _master - jarUrl = _jarUrl - mainClass = _mainClass - _driverOptions ++= tail - - case "kill" :: _master :: _driverId :: tail => - cmd = "kill" - master = _master - driverId = _driverId - - case _ => - printUsageAndExit(1) - } - - /** - * Print usage and exit JVM with the given exit code. - */ - def printUsageAndExit(exitCode: Int) { - // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars - // separately similar to in the YARN client. - val usage = - s""" - |Usage: DriverClient [options] launch [driver options] - |Usage: DriverClient kill - | - |Options: - | -c CORES, --cores CORES Number of cores to request (default: $defaultCores) - | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory) - | -s, --supervise Whether to restart the driver on failure - | -v, --verbose Print more debugging output - """.stripMargin - System.err.println(usage) - System.exit(exitCode) - } -} -- cgit v1.2.3