aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala16
2 files changed, 25 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 9bbd635ab9..481026eaa2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -24,7 +24,8 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@@ -110,6 +111,12 @@ private[spark] class Client(
}
}
+ private def isPossibleMaster(remoteUrl: Address) = {
+ masterUrls.map(s => Master.toAkkaUrl(s))
+ .map(u => AddressFromURIString(u).hostPort)
+ .contains(remoteUrl.hostPort)
+ }
+
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
@@ -145,6 +152,9 @@ private[spark] class Client(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
+ case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
+ logWarning(s"Could not connect to $address: $cause")
+
case StopClient =>
markDead()
sender ! true
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 362cea5e3e..7df7e3d8e5 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,6 +21,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
+import org.apache.log4j.{Level, Logger}
+
import org.apache.spark.SparkConf
/**
@@ -47,8 +49,15 @@ private[spark] object AkkaUtils {
val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
- val lifecycleEvents =
- if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
+ val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+ val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
+ if (!akkaLogLifecycleEvents) {
+ // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
+ // See: https://www.assembla.com/spaces/akka/tickets/3787#/
+ Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
+ }
+
+ val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
val akkaFailureDetector =
@@ -73,7 +82,10 @@ private[spark] object AkkaUtils {
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.log-config-on-start = $logAkkaConfig
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ |akka.log-dead-letters = $lifecycleEvents
+ |akka.log-dead-letters-during-shutdown = $lifecycleEvents
""".stripMargin)
val actorSystem = if (indestructible) {