aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala6
13 files changed, 48 insertions, 71 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 10fae5af9f..ccffcc356c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
-
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
- private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 4d95efa73a..953755e40d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
-import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
/**
@@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c627dd3806..7b2b1c3327 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
-import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
-import akka.remote._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -537,12 +534,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
- val timeoutDuration: FiniteDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
- implicit val timeout = Timeout(timeoutDuration)
- val respFuture = actor ? RequestWebUIPort // ask pattern
- val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
+ val timeout = AkkaUtils.askTimeout
+ val respFuture = actor.ask(RequestWebUIPort)(timeout)
+ val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 3b983c19eb..dbb0cb90f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
+import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 65e7a14e7a..4ef762892c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
-import javax.servlet.http.HttpServletRequest
-
+import scala.concurrent.Await
import scala.xml.Node
-import scala.concurrent.Await
import akka.pattern.ask
-import scala.concurrent.duration._
-
+import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a211ce2b42..9ab594b682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 1a768d501f..0d59048313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 6c18a3c245..40d6bdb3fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
-import scala.concurrent.duration._
-
-import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
-import org.eclipse.jetty.server.{Handler, Server}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- implicit val timeout = Timeout(
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f5e8766f6d..7e22c843bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ private val timeout = AkkaUtils.askTimeout
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
- private val timeout = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- }
-
def stopExecutors() {
try {
if (driverActor != null) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e05b842476..e1d68ef592 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.AkkaUtils
-private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+private[spark]
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 154a3980e9..21022e1cfb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
+import scala.concurrent.Future
+import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import scala.concurrent.duration._
-import scala.concurrent.Future
-
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
-
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- val akkaTimeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val akkaTimeout = AkkaUtils.askTimeout
initLogging()
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a5446b3fc3..39f422dd6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 74133cef6c..1c8b51b8bc 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
+ /** Returns the default Spark timeout to use for Akka ask operations. */
+ def askTimeout: FiniteDuration = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ }
}