From 293a0af5a1def95e47d9188f42957083f5adf3b8 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 18 Dec 2013 18:51:02 -0800 Subject: In experimental clusters we've observed that a 10 second timeout was insufficient, despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data). This would cause an entire job to fail at the beginning of the reduce phase. There is no particular reason for this value to be small as a timeout should only occur in an exceptional situation. Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later be cleaned up to use Typesafe). Finally, deleted some lurking implicits. If anyone can think of a reason they should still be there, please let me know. --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 5 ++--- .../scala/org/apache/spark/deploy/client/Client.scala | 6 +++--- .../scala/org/apache/spark/deploy/master/Master.scala | 17 ++++++----------- .../apache/spark/deploy/master/ui/ApplicationPage.scala | 14 +++++--------- .../org/apache/spark/deploy/master/ui/IndexPage.scala | 16 ++++++---------- .../org/apache/spark/deploy/master/ui/MasterWebUI.scala | 10 +++------- .../org/apache/spark/deploy/worker/ui/IndexPage.scala | 4 ++-- .../org/apache/spark/deploy/worker/ui/WorkerWebUI.scala | 10 +++------- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++++------ .../org/apache/spark/storage/BlockManagerMaster.scala | 7 ++++--- .../apache/spark/storage/BlockManagerMasterActor.scala | 11 ++++------- .../org/apache/spark/ui/storage/BlockManagerUI.scala | 3 --- .../main/scala/org/apache/spark/util/AkkaUtils.scala | 6 ++++++ 13 files changed, 48 insertions(+), 71 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 10fae5af9f..ccffcc356c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -29,8 +29,7 @@ import akka.pattern.ask import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap} - +import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) @@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster private[spark] class MapOutputTracker extends Logging { - private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val timeout = AkkaUtils.askTimeout // Set to the MapOutputTrackerActor living on the driver var trackerActor: Either[ActorRef, ActorSelection] = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 4d95efa73a..953755e40d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,14 +23,14 @@ import scala.concurrent.duration._ import scala.concurrent.Await import akka.actor._ -import akka.pattern.AskTimeoutException import akka.pattern.ask -import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} +import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master +import org.apache.spark.util.AkkaUtils /** @@ -178,7 +178,7 @@ private[spark] class Client( def stop() { if (actor != null) { try { - val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c627dd3806..7b2b1c3327 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -18,19 +18,16 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat -import java.util.concurrent.TimeUnit import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ import akka.pattern.ask -import akka.remote._ +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension -import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher @@ -537,12 +534,10 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) - val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) - val timeoutDuration: FiniteDuration = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) - implicit val timeout = Timeout(timeoutDuration) - val respFuture = actor ? RequestWebUIPort // ask pattern - val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] + val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName) + val timeout = AkkaUtils.askTimeout + val respFuture = actor.ask(RequestWebUIPort)(timeout) + val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse] (actorSystem, boundPort, resp.webUIBoundPort) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 3b983c19eb..dbb0cb90f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -17,32 +17,28 @@ package org.apache.spark.deploy.master.ui +import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask - -import scala.concurrent.Await -import scala.concurrent.duration._ - import javax.servlet.http.HttpServletRequest - import net.liftweb.json.JsonAST.JValue -import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) { val master = parent.masterActorRef - implicit val timeout = parent.timeout + val timeout = parent.timeout /** Executor details for a particular application */ def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) }) @@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 65e7a14e7a..4ef762892c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -17,37 +17,33 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - +import scala.concurrent.Await import scala.xml.Node -import scala.concurrent.Await import akka.pattern.ask -import scala.concurrent.duration._ - +import javax.servlet.http.HttpServletRequest import net.liftweb.json.JsonAST.JValue -import org.apache.spark.deploy.DeployWebUI +import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils private[spark] class IndexPage(parent: MasterWebUI) { val master = parent.masterActorRef - implicit val timeout = parent.timeout + val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) JsonProtocol.writeMasterState(state) } /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") val workers = state.workers.sortBy(_.id) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index a211ce2b42..9ab594b682 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,25 +17,21 @@ package org.apache.spark.deploy.master.ui -import scala.concurrent.duration._ - import javax.servlet.http.HttpServletRequest - import org.eclipse.jetty.server.{Handler, Server} -import org.apache.spark.{Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.JettyUtils import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * Web UI server for the standalone master. */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout val host = Utils.localHostName() val port = requestedPort diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 1a768d501f..0d59048313 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) { def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] - val workerState = Await.result(stateFuture, 30 seconds) + val workerState = Await.result(stateFuture, timeout) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] - val workerState = Await.result(stateFuture, 30 seconds) + val workerState = Await.result(stateFuture, timeout) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") val runningExecutorTable = diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 6c18a3c245..40d6bdb3fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui import java.io.File -import scala.concurrent.duration._ - -import akka.util.Timeout import javax.servlet.http.HttpServletRequest +import org.eclipse.jetty.server.{Handler, Server} import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker import org.apache.spark.ui.{JettyUtils, UIUtils} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils -import org.eclipse.jetty.server.{Handler, Server} +import org.apache.spark.util.{AkkaUtils, Utils} /** * Web UI server for the standalone worker. @@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server} private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) extends Logging { - implicit val timeout = Timeout( - Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) + val timeout = AkkaUtils.askTimeout val host = Utils.localHostName() val port = requestedPort.getOrElse( System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f5e8766f6d..7e22c843bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,10 +27,10 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.{Logging, SparkException, TaskState} import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + private val timeout = AkkaUtils.askTimeout + class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] @@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - private val timeout = { - Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - } - def stopExecutors() { try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e05b842476..e1d68ef592 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -18,7 +18,6 @@ package org.apache.spark.storage import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._ @@ -26,15 +25,17 @@ import akka.pattern.ask import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.AkkaUtils -private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { +private[spark] +class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" - val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 154a3980e9..21022e1cfb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.collection.JavaConversions._ +import scala.concurrent.Future +import scala.concurrent.duration._ import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask -import scala.concurrent.duration._ -import scala.concurrent.Future - import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ -import org.apache.spark.util.Utils - +import org.apache.spark.util.{AkkaUtils, Utils} /** * BlockManagerMasterActor is an actor on the master node to track statuses of @@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] - val akkaTimeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val akkaTimeout = AkkaUtils.askTimeout initLogging() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a5446b3fc3..39f422dd6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - val indexPage = new IndexPage(this) val rddPage = new RDDPage(this) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 74133cef6c..1c8b51b8bc 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.concurrent.duration.{Duration, FiniteDuration} + import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory @@ -84,4 +86,8 @@ private[spark] object AkkaUtils { (actorSystem, boundPort) } + /** Returns the default Spark timeout to use for Akka ask operations. */ + def askTimeout: FiniteDuration = { + Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds") + } } -- cgit v1.2.3