From aaaa6731845495743aff4cc9bd64a54b9aa36c27 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 5 Jan 2014 13:57:42 -0800 Subject: Quite akka when remote lifecycle logging is disabled. I noticed when connecting to a standalone cluster Spark gives a bunch of Akka ERROR logs that make it seem like something is failing. This patch does two things: 1. Akka dead letter logging is turned on/off according to the existing lifecycle spark property. 2. We explicitly silence akka's EndpointWriter log in log4j. This is necessary because for some reason that log doesn't pick up on the lifecycle logging settings. After a few hours of debugging this was the only solution I found that worked. --- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 362cea5e3e..5729334e24 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory +import org.apache.log4j.{Level, Logger} + import org.apache.spark.SparkConf /** @@ -47,8 +49,13 @@ private[spark] object AkkaUtils { val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt - val lifecycleEvents = - if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean + val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" + if (!akkaLogLifecycleEvents) { + Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) + } + + val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off" val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt val akkaFailureDetector = @@ -73,7 +80,10 @@ private[spark] object AkkaUtils { |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents + |akka.log-dead-letters = $lifecycleEvents + |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin) val actorSystem = if (indestructible) { -- cgit v1.2.3 From 94fdcda89638498f127abf3bb5231064182b4945 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 5 Jan 2014 15:10:05 -0800 Subject: Provide logging when attempts to connect to the master fail. Without these it's a bit less clear what's going on for the user. One thing I realize when doing this is that akka itself actually retries the initial association. So the retry we currently have is redundant with akka's. --- .../main/scala/org/apache/spark/deploy/client/Client.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 9bbd635ab9..481026eaa2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -24,7 +24,8 @@ import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ @@ -110,6 +111,12 @@ private[spark] class Client( } } + private def isPossibleMaster(remoteUrl: Address) = { + masterUrls.map(s => Master.toAkkaUrl(s)) + .map(u => AddressFromURIString(u).hostPort) + .contains(remoteUrl.hostPort) + } + override def receive = { case RegisteredApplication(appId_, masterUrl) => appId = appId_ @@ -145,6 +152,9 @@ private[spark] class Client( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + case StopClient => markDead() sender ! true -- cgit v1.2.3 From 675d7eb4f064129d275a45df4c5c43f558638422 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 5 Jan 2014 21:23:14 -0800 Subject: Responding to Aaron's review --- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 ++ 1 file changed, 2 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 5729334e24..7df7e3d8e5 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -52,6 +52,8 @@ private[spark] object AkkaUtils { val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { + // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. + // See: https://www.assembla.com/spaces/akka/tickets/3787#/ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } -- cgit v1.2.3