From 3d939e5fe8930368b46eb49659a9cca8f41c8768 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 7 Jan 2014 23:27:18 -0800 Subject: Adding --verbose option to DriverClient --- .../org/apache/spark/deploy/client/DriverClient.scala | 18 ++++++++++++++---- .../spark/deploy/client/DriverClientArguments.scala | 8 ++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 1cd5d99bcf..8b066ba1a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -22,12 +22,15 @@ import scala.collection.mutable.Map import scala.concurrent._ import akka.actor._ +import akka.actor.Actor -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.{Command, DriverDescription} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.log4j.{Logger, Level} +import akka.remote.RemotingLifecycleEvent /** * Actor that sends a single message to the standalone master and returns the response in the @@ -55,12 +58,18 @@ class DriverActor(master: String, response: Promise[(Boolean, String)]) extends /** * Executable utility for starting and terminating drivers inside of a standalone cluster. */ -object DriverClient extends Logging { +object DriverClient { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) val conf = new SparkConf() + if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { + conf.set("spark.akka.logLifecycleEvents", "true") + } + conf.set("spark.akka.askTimeout", "5") + Logger.getRootLogger.setLevel(driverArgs.logLevel) + // TODO: See if we can initialize akka so return messages are sent back using the same TCP // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, _) = AkkaUtils.createActorSystem( @@ -69,6 +78,7 @@ object DriverClient extends Logging { val response = promise[(Boolean, String)] val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) + println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would @@ -98,9 +108,9 @@ object DriverClient extends Logging { try { Await.result(response.future, AkkaUtils.askTimeout(conf)) } catch { - case e: TimeoutException => (false, s"Master $master failed to respond in time") + case e: TimeoutException => (false, s"Error: Timed out sending message to $master") } - if (success) logInfo(message) else logError(message) + println(message) actorSystem.shutdown() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index d9e1c8a1b0..7774a5615c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy.client import scala.collection.mutable.ListBuffer +import org.apache.log4j.Level + /** * Command-line parser for the driver client. */ @@ -27,6 +29,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { val defaultMemory = 512 var cmd: String = "" // 'launch' or 'kill' + var logLevel = Level.WARN // launch parameters var master: String = "" @@ -59,6 +62,10 @@ private[spark] class DriverClientArguments(args: Array[String]) { case ("--help" | "-h") :: tail => printUsageAndExit(0) + case ("--verbose" | "-v") :: tail => + logLevel = Level.INFO + parse(tail) + case "launch" :: _master :: _jarUrl :: _mainClass :: tail => cmd = "launch" master = _master @@ -90,6 +97,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { | -c CORES, --cores CORES Number of cores to request (default: $defaultCores) | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory) | -s, --supervise Whether to restart the driver on failure + | -v, --verbose Print more debugging output """.stripMargin System.err.println(usage) System.exit(exitCode) -- cgit v1.2.3