diff options
author | Nong Li <nong@databricks.com> | 2015-12-22 13:27:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-12-22 13:27:28 -0800 |
commit | 575a1327976202614a6d3268918ae8dad49fcd72 (patch) | |
tree | 5085e5983a8d5e4131123895f8a186cc440214e4 | |
parent | 7c970f9093bda0a789d7d6e43c72a6d317fc3723 (diff) | |
download | spark-575a1327976202614a6d3268918ae8dad49fcd72.tar.gz spark-575a1327976202614a6d3268918ae8dad49fcd72.tar.bz2 spark-575a1327976202614a6d3268918ae8dad49fcd72.zip |
[SPARK-12471][CORE] Spark daemons will log their pid on start up.
Author: Nong Li <nong@databricks.com>
Closes #10422 from nongli/12471-pids.
9 files changed, 35 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 7fc96e4f76..c514a1a86b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -21,11 +21,11 @@ import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ -import org.apache.spark.{Logging, SparkConf, SecurityManager} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap -import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer} +import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -108,6 +108,7 @@ object ExternalShuffleService extends Logging { private[spark] def main( args: Array[String], newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = { + Utils.initDaemon(log) val sparkConf = new SparkConf Utils.loadDefaultSparkProperties(sparkConf) val securityManager = new SecurityManager(sparkConf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index f31fef0ecc..0bc0cb1c15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -23,14 +23,12 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import com.google.common.cache._ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} - import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, - UIRoot} +import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils} +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -223,7 +221,7 @@ object HistoryServer extends Logging { val UI_PATH_PREFIX = "/history" def main(argStrings: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) new HistoryServerArguments(conf, argStrings) initSecurity() val securityManager = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5d97c63918..bd3d981ce0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -45,7 +45,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{ThreadUtils, Utils} private[deploy] class Master( override val rpcEnv: RpcEnv, @@ -1087,7 +1087,7 @@ private[deploy] object Master extends Logging { val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 389eff5e06..89f1a8671f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.{ShutdownHookManager, SignalLogger} +import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} /* @@ -92,7 +92,7 @@ private[mesos] class MesosClusterDispatcher( private[mesos] object MesosClusterDispatcher extends Logging { def main(args: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) conf.setMaster(dispatcherArgs.masterUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f41efb097b..84e7b366bc 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -686,7 +686,7 @@ private[deploy] object Worker extends Logging { val ENDPOINT_NAME = "Worker" def main(argStrings: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 77c88baa9b..edbd7225ca 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -20,11 +20,8 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer -import org.apache.hadoop.conf.Configuration - import scala.collection.mutable import scala.util.{Failure, Success} - import org.apache.spark.rpc._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -33,7 +30,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -146,7 +143,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { workerUrl: Option[String], userClassPath: Seq[URL]) { - SignalLogger.register(log) + Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index c9f18ebc7f..d85465eb25 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -25,11 +25,11 @@ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv} +import org.apache.spark.{Logging, SparkConf, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData -import org.apache.spark.util.{SignalLogger, Utils} +import org.apache.spark.util.Utils private[spark] class MesosExecutorBackend extends MesosExecutor @@ -121,7 +121,7 @@ private[spark] class MesosExecutorBackend */ private[spark] object MesosExecutorBackend extends Logging { def main(args: Array[String]) { - SignalLogger.register(log) + Utils.initDaemon(log) // Create a new Executor and start it running val runner = new MesosExecutorBackend() new MesosExecutorDriver(runner).run() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fce89dfccf..1a07f7ca7e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,6 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ +import org.slf4j.Logger import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} @@ -2221,6 +2222,23 @@ private[spark] object Utils extends Logging { def tempFileWith(path: File): File = { new File(path.getAbsolutePath + "." + UUID.randomUUID()) } + + /** + * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), + * this is formatted as PID@hostname. + */ + def getProcessName(): String = { + ManagementFactory.getRuntimeMXBean().getName() + } + + /** + * Utility function that should be called early in `main()` for daemons to set up some common + * diagnostic state. + */ + def initDaemon(log: Logger): Unit = { + log.info(s"Started daemon with process name: ${Utils.getProcessName()}") + SignalLogger.register(log) + } } /** diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index a4fd0c3ce9..3e3f0382f6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -67,6 +67,7 @@ object HiveThriftServer2 extends Logging { } def main(args: Array[String]) { + Utils.initDaemon(log) val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { System.exit(-1) |