aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-05 22:37:36 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-05 22:37:36 -0800
commita2e7e0497484554f86bd71e93705eb0422b1512b (patch)
treea1327edd944d8cc818aa9dcf7033c7db37b1d3a3 /core
parent5b0986a1d675f0f9d7d14b3d48fdadcb4f7055b7 (diff)
parent675d7eb4f064129d275a45df4c5c43f558638422 (diff)
downloadspark-a2e7e0497484554f86bd71e93705eb0422b1512b.tar.gz
spark-a2e7e0497484554f86bd71e93705eb0422b1512b.tar.bz2
spark-a2e7e0497484554f86bd71e93705eb0422b1512b.zip
Merge pull request #333 from pwendell/logging-silence
Quiet ERROR-level Akka Logs This fixes an issue I've seen where akka logs a bunch of things at ERROR level when connecting to a standalone cluster, even in the normal case. I noticed that even when lifecycle logging was disabled, the netty code inside of akka still logged away via akka's EndpointWriter class. There are also some other log streams that I think are new in akka 2.2.1 that I've disabled. Finally, I added some better logging to the standalone client. This makes it more clear when a connection failure occurs what is going on. Previously it never explicitly said if a connection attempt had failed. The commit messages here have some more detail.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala16
2 files changed, 25 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 9bbd635ab9..481026eaa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -24,7 +24,8 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@@ -110,6 +111,12 @@ private[spark] class Client(
}
}
+ private def isPossibleMaster(remoteUrl: Address) = {
+ masterUrls.map(s => Master.toAkkaUrl(s))
+ .map(u => AddressFromURIString(u).hostPort)
+ .contains(remoteUrl.hostPort)
+ }
+
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@@ -145,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
+ case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+ logWarning(s"Could not connect to $address: $cause")
+
case StopClient =>
markDead()
sender ! true
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 362cea5e3e..7df7e3d8e5 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
+import org.apache.log4j.{Level, Logger}
+
import org.apache.spark.SparkConf
/**
@@ -47,8 +49,15 @@ private[spark] object AkkaUtils {
val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
- val lifecycleEvents =
- if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+ val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
+ if (!akkaLogLifecycleEvents) {
+ // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
+ // See: https://www.assembla.com/spaces/akka/tickets/3787#/
+ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
+ }
+
+ val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
@@ -73,7 +82,10 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ |akka.log-dead-letters = $lifecycleEvents
+ |akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin)
val actorSystem = if (indestructible) {