diff options
9 files changed, 23 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a686b534da..88a7f24884 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -21,15 +21,11 @@ import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashSet +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor._ -import scala.concurrent.Await import akka.pattern.ask -import akka.remote._ - -import scala.concurrent.duration.Duration -import akka.util.Timeout -import scala.concurrent.duration._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0e2b461b13..c627dd3806 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -39,9 +39,6 @@ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed -import org.apache.spark.deploy.DeployMessages.KillExecutor -import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher @@ -159,7 +156,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act System.exit(0) } - case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -167,7 +164,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) + val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, + sender, workerWebUiPort, publicAddress) registerWorker(worker) persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 0a183afd8e..808b54c0af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration._ import akka.actor._ import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -73,6 +73,7 @@ private[spark] class Worker( val masterLock: Object = new Object() var master: ActorSelection = null + var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" @volatile var registered = false @@ -136,6 +137,10 @@ private[spark] class Worker( activeMasterUrl = url activeMasterWebUiUrl = uiUrl master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(_host, _port) => Address("akka.tcp", Master.systemName, _host, _port.toInt) + case x => throw new SparkException("Invalid spark URL:"+x) + } connected = true } } @@ -240,7 +245,7 @@ private[spark] class Worker( } } - case x: DisassociatedEvent => + case x: DisassociatedEvent if x.remoteAddress == masterAddress => logInfo(s"$x Disassociated !") masterDisconnected() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fdea3f6f88..773e9ec182 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -22,7 +22,6 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -115,6 +114,7 @@ class DAGScheduler( private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { override def preStart() { + import context.dispatcher context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) { if (failed.size > 0) { resubmitFailedStages() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 7c9d6a93e4..8056cb2597 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -24,8 +24,6 @@ import java.util.{TimerTask, Timer} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet - -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import org.apache.spark._ @@ -123,7 +121,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (System.getProperty("spark.speculation", "false").toBoolean) { logInfo("Starting speculative execution thread") - + import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d614dcbdd8..f5e8766f6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging, TaskState} import org.apache.spark.scheduler.TaskDescription @@ -52,7 +52,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] - private val actorToExecutorId = new HashMap[ActorRef, String] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { @@ -77,7 +76,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac executorHost(executorId) = Utils.parseHostPort(hostPort)._1 freeCores(executorId) = cores executorAddress(executorId) = sender.path.address - actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() @@ -147,7 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) - actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index a4aa316e4b..e5de16fc01 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -17,24 +17,15 @@ package org.apache.spark.storage -import akka.actor._ -import scala.concurrent.Await -import scala.concurrent.Future +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import akka.actor._ import akka.pattern.ask -import scala.concurrent.duration._ import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ -import org.apache.spark.storage.BlockManagerMessages.GetLocations -import org.apache.spark.storage.BlockManagerMessages.GetLocationsMultipleBlockIds -import org.apache.spark.storage.BlockManagerMessages.RegisterBlockManager -import org.apache.spark.storage.BlockManagerMessages.HeartBeat -import org.apache.spark.storage.BlockManagerMessages.RemoveExecutor -import org.apache.spark.storage.BlockManagerMessages.GetPeers -import org.apache.spark.storage.BlockManagerMessages.RemoveBlock -import org.apache.spark.storage.BlockManagerMessages.RemoveRdd private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 23e9b735f3..3444d8fdfe 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -44,8 +44,9 @@ private[spark] object AkkaUtils { val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" - val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt - val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble + val akkaHeartBeatPauses = System.getProperty("spark.akka.heartbeat.pauses", "60").toInt + val akkaFailureDetector = + System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt val akkaConf = ConfigFactory.parseString( diff --git a/docs/configuration.md b/docs/configuration.md index 25e7cecbfc..4d1a987f64 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -281,7 +281,7 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td>spark.akka.pauses</td> + <td>spark.akka.heartbeat.pauses</td> <td>60</td> <td> Acceptable heart beat pause in seconds for akka, tune this if you expect GC pauses or network delays (reconnections) etc. @@ -298,7 +298,7 @@ Apart from these, the following properties are also available, and may be useful <td>spark.akka.heartbeat.interval</td> <td>5</td> <td> - A larger interval value in seconds reduces network overhead and a smaller value might be more informative for akka's failure detector. Tune this in combination of `spark.akka.pauses` and `spark.akka.failure-detector.threshold` if you need to. + A larger interval value in seconds reduces network overhead and a smaller value might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. </td> </tr> <tr> |