aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-22 19:46:39 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-25 14:13:21 +0530
commit77929cfeed95905106f5b3891e8de1b1c312d119 (patch)
tree18b718f26b4f55ba1b7cfe53d74c873d98e8d216
parent95d8dbce91f49467050250d5cf3671aaaa648d76 (diff)
downloadspark-77929cfeed95905106f5b3891e8de1b1c312d119.tar.gz
spark-77929cfeed95905106f5b3891e8de1b1c312d119.tar.bz2
spark-77929cfeed95905106f5b3891e8de1b1c312d119.zip
Fine tuning defaults for akka and restored tracking of dissassociated events, for they are delivered when a remote TCP socket is closed. Also made transport failure heartbeats larger interval for it is mostly not needed. As we are using remote death watch instead.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala12
5 files changed, 40 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a7cfc256a9..25f5927128 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,8 +17,9 @@
package org.apache.spark.deploy.master
-import java.util.Date
import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
@@ -28,6 +29,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
import akka.remote._
+import akka.serialization.SerializationExtension
import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
@@ -40,11 +42,6 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
-import akka.actor.Terminated
-import akka.serialization.SerializationExtension
-import java.util.concurrent.TimeUnit
-
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -102,6 +99,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -267,11 +265,20 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
+ logInfo(s"$actor got terminated, removing it.")
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
+ case DisassociatedEvent(_, address, _) => {
+ // The disconnected client could've been either a worker or an app; remove whichever it was
+ logInfo(s"$address got disassociated, removing it.")
+ addressToWorker.get(address).foreach(removeWorker)
+ addressToApp.get(address).foreach(finishApplication)
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
+ }
+
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
state)
@@ -431,6 +438,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
+ context.stop(worker.actor)
+ context.unwatch(worker.actor)
persistenceEngine.removeWorker(worker)
}
@@ -493,6 +502,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
app.driver ! ApplicationRemoved(state.toString)
}
persistenceEngine.removeApplication(app)
+ context.stop(app.driver)
+ context.unwatch(app.driver)
schedule()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 9472c9a619..3a7d0b859b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -17,14 +17,16 @@
package org.apache.spark.deploy.worker
+import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
-import java.io.File
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
+import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
+
import org.apache.spark.Logging
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
@@ -36,10 +38,8 @@ import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
import org.apache.spark.deploy.DeployMessages.Heartbeat
import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import akka.remote.DisassociatedEvent
import org.apache.spark.deploy.DeployMessages.LaunchExecutor
import org.apache.spark.deploy.DeployMessages.RegisterWorker
@@ -124,7 +124,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
-
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
registerWithMaster()
@@ -249,6 +249,10 @@ private[spark] class Worker(
logInfo(s"$actor_ terminated !")
masterDisconnected()
+ case x: DisassociatedEvent =>
+ logInfo(s"$x Disassociated !")
+ masterDisconnected()
+
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, activeMasterUrl, cores, memory,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a98ec06be9..2818a775d0 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,11 +26,6 @@ import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.remote.DisassociatedEvent
-import akka.remote.AssociationErrorEvent
-import akka.remote.DisassociatedEvent
-import akka.actor.Terminated
-
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
@@ -82,7 +77,11 @@ private[spark] class CoarseGrainedExecutorBackend(
}
case Terminated(actor) =>
- logError(s"Driver $actor terminated or disconnected! Shutting down.")
+ logError(s"Driver $actor terminated, Shutting down.")
+ System.exit(1)
+
+ case x: DisassociatedEvent =>
+ logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
case StopExecutor =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 821c30a119..e316f6b41f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -121,6 +121,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+ case DisassociatedEvent(_, address, _) =>
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
+
}
// Make fake resource offers on all executors
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 2a831382df..90a5387b2b 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,9 +44,11 @@ private[spark] object AkkaUtils {
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
- val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
- val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+ val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
+ val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
+ // Since we have our own Heart Beat mechanism and TCP already tracks connections.
+ // Using this makes very little sense. So setting this to a relatively larger value suffices.
+ val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
val akkaConf = ConfigFactory.parseString(
s"""
@@ -56,8 +58,8 @@ private[spark] object AkkaUtils {
|akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
|akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.heartbeat-interval = 30 s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s
|akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"