diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-27 19:23:49 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-27 19:23:49 -0800 |
commit | 44b4a0f88fcb31727347b755ae8ec14d69571b52 (patch) | |
tree | a4d8953bccec3f9898a6639d18ba87d1a27b291a /core | |
parent | b9e2d9efecb3dcffb6b032f0502373782ad634d6 (diff) | |
download | spark-44b4a0f88fcb31727347b755ae8ec14d69571b52.tar.gz spark-44b4a0f88fcb31727347b755ae8ec14d69571b52.tar.bz2 spark-44b4a0f88fcb31727347b755ae8ec14d69571b52.zip |
Track workers by executor ID instead of hostname to allow multiple
executors per machine and remove the need for multiple IP addresses in
unit tests.
Diffstat (limited to 'core')
34 files changed, 342 insertions, 313 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index ac02f3363a..c1f012b419 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -114,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea var array = mapStatuses(shuffleId) if (array != null) { array.synchronized { - if (array(mapId) != null && array(mapId).address == bmAddress) { + if (array(mapId) != null && array(mapId).location == bmAddress) { array(mapId) = null } } @@ -277,7 +277,7 @@ private[spark] object MapOutputTracker { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing an output location for shuffle " + shuffleId)) } else { - (status.address, decompressSize(status.compressedSizes(reduceId))) + (status.location, decompressSize(status.compressedSizes(reduceId))) } } } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 4581c0adcf..39721b47ae 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -80,6 +80,7 @@ class SparkContext( // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.createFromSystemProperties( + "<driver>", System.getProperty("spark.master.host"), System.getProperty("spark.master.port").toInt, true, @@ -97,7 +98,7 @@ class SparkContext( // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() - private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) + private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) // Add each JAR given through the constructor @@ -649,10 +650,9 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - var sizeBefore = persistentRdds.size persistentRdds.clearOldValues(cleanupTime) - logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size) } } diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 2a7a8af83d..0c094edcf3 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -19,6 +19,7 @@ import spark.util.AkkaUtils * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. */ class SparkEnv ( + val executorId: String, val actorSystem: ActorSystem, val serializer: Serializer, val closureSerializer: Serializer, @@ -58,11 +59,12 @@ object SparkEnv extends Logging { } def createFromSystemProperties( + executorId: String, hostname: String, port: Int, isMaster: Boolean, - isLocal: Boolean - ) : SparkEnv = { + isLocal: Boolean): SparkEnv = { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), @@ -86,7 +88,7 @@ object SparkEnv extends Logging { val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt val blockManagerMaster = new BlockManagerMaster( actorSystem, isMaster, isLocal, masterIp, masterPort) - val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer) + val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer) val connectionManager = blockManager.connectionManager @@ -122,6 +124,7 @@ object SparkEnv extends Logging { } new SparkEnv( + executorId, actorSystem, serializer, closureSerializer, diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 4211d80596..8f51051e39 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -9,6 +9,12 @@ import spark.{Logging, Utils} import scala.collection.mutable.ArrayBuffer +/** + * Testing class that creates a Spark standalone process in-cluster (that is, running the + * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched + * by the Workers still run in separate JVMs. This can be used to test distributed operation and + * fault recovery without spinning up a lot of processes. + */ private[spark] class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { @@ -35,16 +41,12 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) /* Start the Slaves */ for (slaveNum <- 1 to numSlaves) { - /* We can pretend to test distributed stuff by giving the slaves distinct hostnames. - All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is - sufficiently distinctive. */ - val slaveIpAddress = "127.100.0." + (slaveNum % 256) val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0) + AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) slaveActorSystems += actorSystem val actor = actorSystem.actorOf( - Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), - name = "Worker") + Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), + name = "Worker") slaveActors += actor } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 2c2cd0231b..2e7e868579 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -97,10 +97,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop. - if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) { + if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) { schedule() } else { - val e = new SparkException("Job %s wth ID %s failed %d times.".format( + val e = new SparkException("Job %s with ID %s failed %d times.".format( jobInfo.desc.name, jobInfo.id, jobInfo.retryCount)) logError(e.getMessage, e) throw e diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 0d1fe2a6b4..af3acfecb6 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -67,7 +67,7 @@ private[spark] class ExecutorRunner( /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { - case "{{SLAVEID}}" => workerId + case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => hostname case "{{CORES}}" => cores.toString case other => other diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 28d9d40d43..bd21ba719a 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -30,7 +30,7 @@ private[spark] class Executor extends Logging { initLogging() - def initialize(slaveHostname: String, properties: Seq[(String, String)]) { + def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { // Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname) @@ -64,7 +64,7 @@ private[spark] class Executor extends Logging { ) // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) + env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env) // Start worker thread pool diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index eeab3959c6..1ef88075ad 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -29,9 +29,10 @@ private[spark] class MesosExecutorBackend(executor: Executor) executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { + logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize(slaveInfo.getHostname, properties) + executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index a29bf974d2..435ee5743e 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -17,7 +17,7 @@ import spark.scheduler.cluster.RegisterSlave private[spark] class StandaloneExecutorBackend( executor: Executor, masterUrl: String, - slaveId: String, + executorId: String, hostname: String, cores: Int) extends Actor @@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend( try { logInfo("Connecting to master: " + masterUrl) master = context.actorFor(masterUrl) - master ! RegisterSlave(slaveId, hostname, cores) + master ! RegisterSlave(executorId, hostname, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { @@ -43,7 +43,7 @@ private[spark] class StandaloneExecutorBackend( override def receive = { case RegisteredSlave(sparkProperties) => logInfo("Successfully registered with master") - executor.initialize(hostname, sparkProperties) + executor.initialize(executorId, hostname, sparkProperties) case RegisterSlaveFailed(message) => logError("Slave registration failed: " + message) @@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend( } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - master ! StatusUpdate(slaveId, taskId, state, data) + master ! StatusUpdate(executorId, taskId, state, data) } } private[spark] object StandaloneExecutorBackend { - def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) { + def run(masterUrl: String, executorId: String, hostname: String, cores: Int) { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)), + Props(new StandaloneExecutorBackend(new Executor, masterUrl, executorId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } def main(args: Array[String]) { if (args.length != 4) { - System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>") + System.err.println("Usage: StandaloneExecutorBackend <master> <executorId> <hostname> <cores>") System.exit(1) } run(args(0), args(1), args(2), args(3).toInt) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index f599eb00bd..bd541d4207 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -35,9 +35,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with eventQueue.put(CompletionEvent(task, reason, result, accumUpdates)) } - // Called by TaskScheduler when a host fails. - override def hostLost(host: String) { - eventQueue.put(HostLost(host)) + // Called by TaskScheduler when an executor fails. + override def executorLost(execId: String) { + eventQueue.put(ExecutorLost(execId)) } // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures. @@ -72,7 +72,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with // For tracking failed nodes, we use the MapOutputTracker's generation number, which is // sent with every task. When we detect a node failing, we note the current generation number - // and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask + // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask // results. // TODO: Garbage collect information about failure generations when we know there are no more // stray messages to detect. @@ -108,7 +108,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } def clearCacheLocs() { - cacheLocs.clear + cacheLocs.clear() } /** @@ -271,8 +271,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with submitStage(finalStage) } - case HostLost(host) => - handleHostLost(host) + case ExecutorLost(execId) => + handleExecutorLost(execId) case completion: CompletionEvent => handleTaskCompletion(completion) @@ -436,10 +436,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with case smt: ShuffleMapTask => val stage = idToStage(smt.stageId) val status = event.result.asInstanceOf[MapStatus] - val host = status.address.ip - logInfo("ShuffleMapTask finished with host " + host) - if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) { - logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host) + val execId = status.location.executorId + logDebug("ShuffleMapTask finished on " + execId) + if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) { + logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partition, status) } @@ -511,9 +511,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with // Remember that a fetch failed now; this is used to resubmit the broken // stages later, after a small wait (to give other tasks the chance to fail) lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock - // TODO: mark the host as failed only if there were lots of fetch failures on it + // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleHostLost(bmAddress.ip, Some(task.generation)) + handleExecutorLost(bmAddress.executorId, Some(task.generation)) } case other => @@ -523,21 +523,21 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } /** - * Responds to a host being lost. This is called inside the event loop so it assumes that it can - * modify the scheduler's internal state. Use hostLost() to post a host lost event from outside. + * Responds to an executor being lost. This is called inside the event loop, so it assumes it can + * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * * Optionally the generation during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - def handleHostLost(host: String, maybeGeneration: Option[Long] = None) { + def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) { val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration) - if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) { - failedGeneration(host) = currentGeneration - logInfo("Host lost: " + host + " (generation " + currentGeneration + ")") - env.blockManager.master.notifyADeadHost(host) + if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) { + failedGeneration(execId) = currentGeneration + logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration)) + env.blockManager.master.removeExecutor(execId) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { - stage.removeOutputsOnHost(host) + stage.removeOutputsOnExecutor(execId) val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray mapOutputTracker.registerMapOutputs(shuffleId, locs, true) } @@ -546,7 +546,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } clearCacheLocs() } else { - logDebug("Additional host lost message for " + host + + logDebug("Additional executor lost message for " + execId + "(generation " + currentGeneration + ")") } } diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 3422a21d9d..b34fa78c07 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -28,7 +28,7 @@ private[spark] case class CompletionEvent( accumUpdates: Map[Long, Any]) extends DAGSchedulerEvent -private[spark] case class HostLost(host: String) extends DAGSchedulerEvent +private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala index fae643f3a8..203abb917b 100644 --- a/core/src/main/scala/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/spark/scheduler/MapStatus.scala @@ -8,19 +8,19 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable} * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. * The map output sizes are compressed using MapOutputTracker.compressSize. */ -private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte]) +private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte]) extends Externalizable { def this() = this(null, null) // For deserialization only def writeExternal(out: ObjectOutput) { - address.writeExternal(out) + location.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } def readExternal(in: ObjectInput) { - address = BlockManagerId(in) + location = BlockManagerId(in) compressedSizes = new Array[Byte](in.readInt()) in.readFully(compressedSizes) } diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 4846b66729..e9419728e3 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -51,18 +51,18 @@ private[spark] class Stage( def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_.address == bmAddress) + val newList = prevList.filterNot(_.location == bmAddress) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { numAvailableOutputs -= 1 } } - def removeOutputsOnHost(host: String) { + def removeOutputsOnExecutor(execId: String) { var becameUnavailable = false for (partition <- 0 until numPartitions) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_.address.ip == host) + val newList = prevList.filterNot(_.location.executorId == execId) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { becameUnavailable = true @@ -70,7 +70,8 @@ private[spark] class Stage( } } if (becameUnavailable) { - logInfo("%s is now unavailable on %s (%d/%d, %s)".format(this, host, numAvailableOutputs, numPartitions, isAvailable)) + logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format( + this, execId, numAvailableOutputs, numPartitions, isAvailable)) } } @@ -82,7 +83,7 @@ private[spark] class Stage( def origin: String = rdd.origin - override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]" + override def toString = "Stage " + id override def hashCode(): Int = id } diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala index fa4de15d0d..9fcef86e46 100644 --- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala @@ -12,7 +12,7 @@ private[spark] trait TaskSchedulerListener { def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit // A node was lost from the cluster. - def hostLost(host: String): Unit + def executorLost(execId: String): Unit // The TaskScheduler wants to abort an entire task set. def taskSetFailed(taskSet: TaskSet, reason: String): Unit diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index a639b72795..0b4177805b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] - val taskIdToSlaveId = new HashMap[Long, String] + val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) - // Which hosts in the cluster are alive (contains hostnames) - val hostsAlive = new HashSet[String] + // Which executor IDs we have executors on + val activeExecutorIds = new HashSet[String] - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] + // The set of executors we have on each host; this is used to compute hostsAlive, which + // in turn is used to decide when we can attain data locality on a given host + val executorsByHost = new HashMap[String, HashSet[String]] - val slaveIdToHost = new HashMap[String, String] + val executorIdToHost = new HashMap[String, String] // JAR server, if any JARs were added by the user to the SparkContext var jarServer: HttpServer = null @@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets -= manager.taskSet.id activeTaskSetsQueue -= manager taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id) - taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id) + taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id) taskSetTaskIds.remove(manager.taskSet.id) } } @@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { - slaveIdToHost(o.slaveId) = o.hostname - hostsAlive += o.hostname + executorIdToHost(o.executorId) = o.hostname } // Build a list of tasks to assign to each slave val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) @@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) do { launchedTask = false for (i <- 0 until offers.size) { - val sid = offers(i).slaveId + val execId = offers(i).executorId val host = offers(i).hostname - manager.slaveOffer(sid, host, availableCpus(i)) match { + manager.slaveOffer(execId, host, availableCpus(i)) match { case Some(task) => tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = manager.taskSet.id taskSetTaskIds(manager.taskSet.id) += tid - taskIdToSlaveId(tid) = sid - slaveIdsWithExecutors += sid + taskIdToExecutorId(tid) = execId + activeExecutorIds += execId + if (!executorsByHost.contains(host)) { + executorsByHost(host) = new HashSet() + } + executorsByHost(host) += execId availableCpus(i) -= 1 launchedTask = true @@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var taskSetToUpdate: Option[TaskSetManager] = None - var failedHost: Option[String] = None + var failedExecutor: Option[String] = None var taskFailed = false synchronized { try { - if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - val slaveId = taskIdToSlaveId(tid) - val host = slaveIdToHost(slaveId) - if (hostsAlive.contains(host)) { - slaveIdsWithExecutors -= slaveId - hostsAlive -= host - activeTaskSetsQueue.foreach(_.hostLost(host)) - failedHost = Some(host) + if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { + // We lost this entire executor, so remember that it's gone + val execId = taskIdToExecutorId(tid) + if (activeExecutorIds.contains(execId)) { + removeExecutor(execId) + failedExecutor = Some(execId) } } taskIdToTaskSetId.get(tid) match { case Some(taskSetId) => if (activeTaskSets.contains(taskSetId)) { - //activeTaskSets(taskSetId).statusUpdate(status) taskSetToUpdate = Some(activeTaskSets(taskSetId)) } if (TaskState.isFinished(state)) { @@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (taskSetTaskIds.contains(taskSetId)) { taskSetTaskIds(taskSetId) -= tid } - taskIdToSlaveId.remove(tid) + taskIdToExecutorId.remove(tid) } if (state == TaskState.FAILED) { taskFailed = true @@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) case e: Exception => logError("Exception in statusUpdate", e) } } - // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock + // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock if (taskSetToUpdate != None) { taskSetToUpdate.get.statusUpdate(tid, state, serializedData) } - if (failedHost != None) { - listener.hostLost(failedHost.get) + if (failedExecutor != None) { + listener.executorLost(failedExecutor.get) backend.reviveOffers() } if (taskFailed) { @@ -249,32 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String, reason: ExecutorLossReason) { - var failedHost: Option[String] = None + def executorLost(executorId: String, reason: ExecutorLossReason) { + var failedExecutor: Option[String] = None synchronized { - slaveIdToHost.get(slaveId) match { - case Some(host) => - if (hostsAlive.contains(host)) { - logError("Lost an executor on " + host + ": " + reason) - slaveIdsWithExecutors -= slaveId - hostsAlive -= host - activeTaskSetsQueue.foreach(_.hostLost(host)) - failedHost = Some(host) - } else { - // We may get multiple slaveLost() calls with different loss reasons. For example, one - // may be triggered by a dropped connection from the slave while another may be a report - // of executor termination from Mesos. We produce log messages for both so we eventually - // report the termination reason. - logError("Lost an executor on " + host + " (already removed): " + reason) - } - case None => - // We were told about a slave being lost before we could even allocate work to it - logError("Lost slave " + slaveId + " (no work assigned yet)") + if (activeExecutorIds.contains(executorId)) { + val host = executorIdToHost(executorId) + logError("Lost executor %s on %s: %s".format(executorId, host, reason)) + removeExecutor(executorId) + failedExecutor = Some(executorId) + } else { + // We may get multiple executorLost() calls with different loss reasons. For example, one + // may be triggered by a dropped connection from the slave while another may be a report + // of executor termination from Mesos. We produce log messages for both so we eventually + // report the termination reason. + logError("Lost an executor " + executorId + " (already removed): " + reason) } } - if (failedHost != None) { - listener.hostLost(failedHost.get) + // Call listener.executorLost without holding the lock on this to prevent deadlock + if (failedExecutor != None) { + listener.executorLost(failedExecutor.get) backend.reviveOffers() } } + + /** Get a list of hosts that currently have executors */ + def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet + + /** Remove an executor from all our data structures and mark it as lost */ + private def removeExecutor(executorId: String) { + activeExecutorIds -= executorId + val host = executorIdToHost(executorId) + val execs = executorsByHost.getOrElse(host, new HashSet) + execs -= executorId + if (execs.isEmpty) { + executorsByHost -= host + } + executorIdToHost -= executorId + activeTaskSetsQueue.foreach(_.executorLost(executorId, host)) + } } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 4f82cd96dd..f0792c1b76 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -37,7 +37,7 @@ private[spark] class SparkDeploySchedulerBackend( val masterUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), StandaloneSchedulerBackend.ACTOR_NAME) - val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(masterUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone")) val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome) @@ -81,7 +81,7 @@ private[spark] class SparkDeploySchedulerBackend( executorIdToSlaveId.get(id) match { case Some(slaveId) => executorIdToSlaveId.remove(id) - scheduler.slaveLost(slaveId, reason) + scheduler.executorLost(slaveId, reason) case None => logInfo("No slave ID known for executor %s".format(id)) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index eeaae23dc8..32be1e7a26 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,8 +28,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor val slaveAddress = new HashMap[String, Address] val slaveHost = new HashMap[String, String] val freeCores = new HashMap[String, Int] - val actorToSlaveId = new HashMap[ActorRef, String] - val addressToSlaveId = new HashMap[Address, String] + val actorToExecutorId = new HashMap[ActorRef, String] + val addressToExecutorId = new HashMap[Address, String] override def preStart() { // Listen for remote client disconnection events, since they don't go through Akka's watch() @@ -37,28 +37,28 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } def receive = { - case RegisterSlave(slaveId, host, cores) => - if (slaveActor.contains(slaveId)) { - sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId) + case RegisterSlave(executorId, host, cores) => + if (slaveActor.contains(executorId)) { + sender ! RegisterSlaveFailed("Duplicate executor ID: " + executorId) } else { - logInfo("Registered slave: " + sender + " with ID " + slaveId) + logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredSlave(sparkProperties) context.watch(sender) - slaveActor(slaveId) = sender - slaveHost(slaveId) = host - freeCores(slaveId) = cores - slaveAddress(slaveId) = sender.path.address - actorToSlaveId(sender) = slaveId - addressToSlaveId(sender.path.address) = slaveId + slaveActor(executorId) = sender + slaveHost(executorId) = host + freeCores(executorId) = cores + slaveAddress(executorId) = sender.path.address + actorToExecutorId(sender) = executorId + addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() } - case StatusUpdate(slaveId, taskId, state, data) => + case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - freeCores(slaveId) += 1 - makeOffers(slaveId) + freeCores(executorId) += 1 + makeOffers(executorId) } case ReviveOffers => @@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor context.stop(self) case Terminated(actor) => - actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated")) + actorToExecutorId.get(actor).foreach(removeSlave(_, "Akka actor terminated")) case RemoteClientDisconnected(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected")) + addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client disconnected")) case RemoteClientShutdown(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown")) + addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client shutdown")) } // Make fake resource offers on all slaves @@ -85,31 +85,31 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } // Make fake resource offers on just one slave - def makeOffers(slaveId: String) { + def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))) + Seq(new WorkerOffer(executorId, slaveHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.slaveId) -= 1 - slaveActor(task.slaveId) ! LaunchTask(task) + freeCores(task.executorId) -= 1 + slaveActor(task.executorId) ! LaunchTask(task) } } // Remove a disconnected slave from the cluster - def removeSlave(slaveId: String, reason: String) { - logInfo("Slave " + slaveId + " disconnected, so removing it") - val numCores = freeCores(slaveId) - actorToSlaveId -= slaveActor(slaveId) - addressToSlaveId -= slaveAddress(slaveId) - slaveActor -= slaveId - slaveHost -= slaveId - freeCores -= slaveId - slaveHost -= slaveId + def removeSlave(executorId: String, reason: String) { + logInfo("Slave " + executorId + " disconnected, so removing it") + val numCores = freeCores(executorId) + actorToExecutorId -= slaveActor(executorId) + addressToExecutorId -= slaveAddress(executorId) + slaveActor -= executorId + slaveHost -= executorId + freeCores -= executorId + slaveHost -= executorId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId, SlaveLost(reason)) + scheduler.executorLost(executorId, SlaveLost(reason)) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index aa097fd3a2..b41e951be9 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -5,7 +5,7 @@ import spark.util.SerializableBuffer private[spark] class TaskDescription( val taskId: Long, - val slaveId: String, + val executorId: String, val name: String, _serializedTask: ByteBuffer) extends Serializable { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala index ca84503780..0f975ce1eb 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala @@ -4,7 +4,12 @@ package spark.scheduler.cluster * Information about a running task attempt inside a TaskSet. */ private[spark] -class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) { +class TaskInfo( + val taskId: Long, + val index: Int, + val launchTime: Long, + val executorId: String, + val host: String) { var finishTime: Long = 0 var failed = false diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index a089b71644..26201ad0dd 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -138,10 +138,11 @@ private[spark] class TaskSetManager( // attempt running on this host, in case the host is slow. In addition, if localOnly is set, the // task must have a preference for this host (or no preferred locations at all). def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = { + val hostsAlive = sched.hostsAlive speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set val localTask = speculatableTasks.find { index => - val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive + val locations = tasks(index).preferredLocations.toSet & hostsAlive val attemptLocs = taskAttempts(index).map(_.host) (locations.size == 0 || locations.contains(host)) && !attemptLocs.contains(host) } @@ -189,7 +190,7 @@ private[spark] class TaskSetManager( } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -206,11 +207,11 @@ private[spark] class TaskSetManager( } else { "non-preferred, not one of " + task.preferredLocations.mkString(", ") } - logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format( - taskSet.id, index, taskId, slaveId, host, prefStr)) + logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( + taskSet.id, index, taskId, execId, host, prefStr)) // Do various bookkeeping copiesRunning(index) += 1 - val info = new TaskInfo(taskId, index, time, host) + val info = new TaskInfo(taskId, index, time, execId, host) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) if (preferred) { @@ -224,7 +225,7 @@ private[spark] class TaskSetManager( logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) - return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask)) + return Some(new TaskDescription(taskId, execId, taskName, serializedTask)) } case _ => } @@ -356,19 +357,22 @@ private[spark] class TaskSetManager( sched.taskSetFinished(this) } - def hostLost(hostname: String) { - logInfo("Re-queueing tasks for " + hostname + " from TaskSet " + taskSet.id) - // If some task has preferred locations only on hostname, put it in the no-prefs list - // to avoid the wait from delay scheduling - for (index <- getPendingTasksForHost(hostname)) { - val newLocs = tasks(index).preferredLocations.toSet & sched.hostsAlive - if (newLocs.isEmpty) { - pendingTasksWithNoPrefs += index + def executorLost(execId: String, hostname: String) { + logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) + val newHostsAlive = sched.hostsAlive + // If some task has preferred locations only on hostname, and there are no more executors there, + // put it in the no-prefs list to avoid the wait from delay scheduling + if (!newHostsAlive.contains(hostname)) { + for (index <- getPendingTasksForHost(hostname)) { + val newLocs = tasks(index).preferredLocations.toSet & newHostsAlive + if (newLocs.isEmpty) { + pendingTasksWithNoPrefs += index + } } } - // Re-enqueue any tasks that ran on the failed host if this is a shuffle map stage + // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage if (tasks(0).isInstanceOf[ShuffleMapTask]) { - for ((tid, info) <- taskInfos if info.host == hostname) { + for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (finished(index)) { finished(index) = false @@ -382,7 +386,7 @@ private[spark] class TaskSetManager( } } // Also re-enqueue any tasks that were running on the node - for ((tid, info) <- taskInfos if info.running && info.host == hostname) { + for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { taskLost(tid, TaskState.KILLED, null) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala index 6b919d68b2..3c3afcbb14 100644 --- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala +++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala @@ -1,8 +1,8 @@ package spark.scheduler.cluster /** - * Represents free resources available on a worker node. + * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) { +class WorkerOffer(val executorId: String, val hostname: String, val cores: Int) { } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 2989e31f5e..f3467db86b 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -268,7 +268,7 @@ private[spark] class MesosSchedulerBackend( synchronized { slaveIdsWithExecutors -= slaveId.getValue } - scheduler.slaveLost(slaveId.getValue, reason) + scheduler.executorLost(slaveId.getValue, reason) } override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 19d35b8667..1215d5f5c8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -30,6 +30,7 @@ extends Exception(message) private[spark] class BlockManager( + executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, val serializer: Serializer, @@ -68,8 +69,8 @@ class BlockManager( val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext - val connectionManagerId = connectionManager.id - val blockManagerId = BlockManagerId(connectionManagerId.host, connectionManagerId.port) + val blockManagerId = BlockManagerId( + executorId, connectionManager.id.host, connectionManager.id.port) // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) @@ -109,8 +110,9 @@ class BlockManager( /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = { - this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties) + def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster, + serializer: Serializer) = { + this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties) } /** diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala index abb8b45a1f..f2f1e77d41 100644 --- a/core/src/main/scala/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/spark/storage/BlockManagerId.scala @@ -7,27 +7,32 @@ import java.util.concurrent.ConcurrentHashMap * This class represent an unique identifier for a BlockManager. * The first 2 constructors of this class is made private to ensure that * BlockManagerId objects can be created only using the factory method in - * [[spark.storage.BlockManager$]]. This allows de-duplication of id objects. + * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects. * Also, constructor parameters are private to ensure that parameters cannot * be modified from outside this class. */ private[spark] class BlockManagerId private ( + private var executorId_ : String, private var ip_ : String, private var port_ : Int ) extends Externalizable { - private def this() = this(null, 0) // For deserialization only + private def this() = this(null, null, 0) // For deserialization only - def ip = ip_ + def executorId: String = executorId_ - def port = port_ + def ip: String = ip_ + + def port: Int = port_ override def writeExternal(out: ObjectOutput) { + out.writeUTF(executorId_) out.writeUTF(ip_) out.writeInt(port_) } override def readExternal(in: ObjectInput) { + executorId_ = in.readUTF() ip_ = in.readUTF() port_ = in.readInt() } @@ -35,21 +40,23 @@ private[spark] class BlockManagerId private ( @throws(classOf[IOException]) private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) - override def toString = "BlockManagerId(" + ip + ", " + port + ")" + override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, ip, port) - override def hashCode = ip.hashCode * 41 + port + override def hashCode: Int = (executorId.hashCode * 41 + ip.hashCode) * 41 + port override def equals(that: Any) = that match { - case id: BlockManagerId => port == id.port && ip == id.ip - case _ => false + case id: BlockManagerId => + executorId == id.executorId && port == id.port && ip == id.ip + case _ => + false } } private[spark] object BlockManagerId { - def apply(ip: String, port: Int) = - getCachedBlockManagerId(new BlockManagerId(ip, port)) + def apply(execId: String, ip: String, port: Int) = + getCachedBlockManagerId(new BlockManagerId(execId, ip, port)) def apply(in: ObjectInput) = { val obj = new BlockManagerId() diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 937115e92c..55ff1dde9c 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -24,7 +24,7 @@ private[spark] class BlockManagerMaster( masterPort: Int) extends Logging { - val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt + val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager" @@ -45,10 +45,10 @@ private[spark] class BlockManagerMaster( } } - /** Remove a dead host from the master actor. This is only called on the master side. */ - def notifyADeadHost(host: String) { - tell(RemoveHost(host)) - logInfo("Removed " + host + " successfully in notifyADeadHost") + /** Remove a dead executor from the master actor. This is only called on the master side. */ + def removeExecutor(execId: String) { + tell(RemoveExecutor(execId)) + logInfo("Removed " + execId + " successfully in removeExecutor") } /** @@ -146,7 +146,7 @@ private[spark] class BlockManagerMaster( } var attempts = 0 var lastException: Exception = null - while (attempts < AKKA_RETRY_ATTEMPS) { + while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { val future = masterActor.ask(message)(timeout) diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index b31b6286d3..f88517f1a3 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -23,9 +23,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] - // Mapping from host name to block manager id. We allow multiple block managers - // on the same host name (ip). - private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]] + // Mapping from executor ID to block manager ID. + private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] @@ -74,8 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { case RemoveBlock(blockId) => removeBlock(blockId) - case RemoveHost(host) => - removeHost(host) + case RemoveExecutor(execId) => + removeExecutor(execId) sender ! true case StopBlockManagerMaster => @@ -99,16 +98,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - // Remove the block manager from blockManagerIdByHost. If the list of block - // managers belonging to the IP is empty, remove the entry from the hash map. - blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] => - managers -= blockManagerId - if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip) - } + // Remove the block manager from blockManagerIdByExecutor. + blockManagerIdByExecutor -= blockManagerId.executorId // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) - var iterator = info.blocks.keySet.iterator + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next val locations = blockLocations.get(blockId)._2 @@ -133,17 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { toRemove.foreach(removeBlockManager) } - def removeHost(host: String) { - logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") - logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager)) - logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) + def removeExecutor(execId: String) { + logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") + blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) sender ! true } def heartBeat(blockManagerId: BlockManagerId) { if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + if (blockManagerId.executorId == "<driver>" && !isLocal) { sender ! true } else { sender ! false @@ -188,24 +181,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! res } - private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockManagerId + " " - - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { - logInfo("Got Register Msg from master node, don't register it") - } else if (!blockManagerInfo.contains(blockManagerId)) { - blockManagerIdByHost.get(blockManagerId.ip) match { - case Some(managers) => - // A block manager of the same host name already exists. - logInfo("Got another registration for host " + blockManagerId) - managers += blockManagerId + private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + if (id.executorId == "<driver>" && !isLocal) { + // Got a register message from the master node; don't register it + } else if (!blockManagerInfo.contains(id)) { + blockManagerIdByExecutor.get(id.executorId) match { + case Some(manager) => + // A block manager of the same host name already exists + logError("Got two different block manager registrations on " + id.executorId) + System.exit(1) case None => - blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId)) + blockManagerIdByExecutor(id.executorId) = id } - - blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo( - blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) + blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( + id, System.currentTimeMillis(), maxMemSize, slaveActor) } sender ! true } @@ -217,11 +206,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { memSize: Long, diskSize: Long) { - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockManagerId + " " + blockId + " " - if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + if (blockManagerId.executorId == "<driver>" && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. sender ! true @@ -353,8 +339,8 @@ object BlockManagerMasterActor { _lastSeenMs = System.currentTimeMillis() } - def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) - : Unit = synchronized { + def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, + diskSize: Long) { updateLastSeenMs() diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 3d03ff3a93..1494f90103 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -88,7 +88,7 @@ private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster private[spark] -case class RemoveHost(host: String) extends ToBlockManagerMaster +case class RemoveExecutor(execId: String) extends ToBlockManagerMaster private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 1003cc7a61..b7423c7234 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -11,6 +11,7 @@ import cc.spray.typeconversion.TwirlSupport._ import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext, SparkEnv} import spark.util.AkkaUtils +import spark.Utils private[spark] @@ -20,10 +21,10 @@ object BlockManagerUI extends Logging { def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) { val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc) try { - logInfo("Starting BlockManager WebUI.") - val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt - AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, + val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", + Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt, webUIDirectives.handler, "BlockManagerHTTPServer") + logInfo("Started BlockManager web UI at %s:%d".format(Utils.localHostName(), boundPort)) } catch { case e: Exception => logError("Failed to create BlockManager WebUI", e) diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index 689f07b969..f04c046c31 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -78,7 +78,8 @@ private[spark] object ThreadingTest { val masterIp: String = System.getProperty("spark.master.host", "localhost") val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort) - val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024) + val blockManager = new BlockManager( + "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) producers.foreach(_.start) diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index ff2c3079be..775ff8f1aa 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -52,10 +52,10 @@ private[spark] object AkkaUtils { /** * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to - * handle requests. Throws a SparkException if this fails. + * handle requests. Returns the bound port or throws a SparkException on failure. */ def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, - name: String = "HttpServer") { + name: String = "HttpServer"): Int = { val ioWorker = new IoWorker(actorSystem).start() val httpService = actorSystem.actorOf(Props(new HttpService(route))) val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) @@ -67,7 +67,7 @@ private[spark] object AkkaUtils { try { Await.result(future, timeout) match { case bound: HttpServer.Bound => - return + return bound.endpoint.getPort case other: Any => throw new SparkException("Failed to bind web UI to port " + port + ": " + other) } diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index bb7c5c01c8..188f8910da 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -63,9 +63,9 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() - override def size(): Int = internalMap.size() + override def size: Int = internalMap.size - override def foreach[U](f: ((A, B)) => U): Unit = { + override def foreach[U](f: ((A, B)) => U) { val iterator = internalMap.entrySet().iterator() while(iterator.hasNext) { val entry = iterator.next() diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala index 70a7c8bc2f..342610e1dd 100644 --- a/core/src/test/scala/spark/DriverSuite.scala +++ b/core/src/test/scala/spark/DriverSuite.scala @@ -13,7 +13,8 @@ class DriverSuite extends FunSuite with Timeouts { val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => failAfter(10 seconds) { - Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME"))) + Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), + new File(System.getenv("SPARK_HOME"))) } } } @@ -28,4 +29,4 @@ object DriverWithoutCleanup { val sc = new SparkContext(args(0), "DriverWithoutCleanup") sc.parallelize(1 to 100, 4).count() } -}
\ No newline at end of file +} diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 7d5305f1e0..e8fe7ecabc 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -43,13 +43,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000))) val statuses = tracker.getServerStatuses(10, 0) - assert(statuses.toSeq === Seq((BlockManagerId("hostA", 1000), size1000), - (BlockManagerId("hostB", 1000), size10000))) + assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), + (BlockManagerId("b", "hostB", 1000), size10000))) tracker.stop() } @@ -61,47 +61,52 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) val size10000 = MapOutputTracker.decompressSize(compressedSize10000) - tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000), + tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000), Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000), + tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000, compressedSize1000))) // As if we had two simulatenous fetch failures - tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000)) - tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) + tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) - // The remaining reduce task might try to grab the output dispite the shuffle failure; + // The remaining reduce task might try to grab the output despite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } } test("remote fetch") { - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("test", "localhost", 0) - System.setProperty("spark.master.port", boundPort.toString) - val masterTracker = new MapOutputTracker(actorSystem, true) - val slaveTracker = new MapOutputTracker(actorSystem, false) - masterTracker.registerShuffle(10, 1) - masterTracker.incrementGeneration() - slaveTracker.updateGeneration(masterTracker.getGeneration) - intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + try { + System.clearProperty("spark.master.host") // In case some previous test had set it + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("test", "localhost", 0) + System.setProperty("spark.master.port", boundPort.toString) + val masterTracker = new MapOutputTracker(actorSystem, true) + val slaveTracker = new MapOutputTracker(actorSystem, false) + masterTracker.registerShuffle(10, 1) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - val compressedSize1000 = MapOutputTracker.compressSize(1000L) - val size1000 = MapOutputTracker.decompressSize(compressedSize1000) - masterTracker.registerMapOutput(10, 0, new MapStatus( - BlockManagerId("hostA", 1000), Array(compressedSize1000))) - masterTracker.incrementGeneration() - slaveTracker.updateGeneration(masterTracker.getGeneration) - assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("hostA", 1000), size1000))) + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + masterTracker.registerMapOutput(10, 0, new MapStatus( + BlockManagerId("a", "hostA", 1000), Array(compressedSize1000))) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + assert(slaveTracker.getServerStatuses(10, 0).toSeq === + Seq((BlockManagerId("a", "hostA", 1000), size1000))) - masterTracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000)) - masterTracker.incrementGeneration() - slaveTracker.updateGeneration(masterTracker.getGeneration) - intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) + masterTracker.incrementGeneration() + slaveTracker.updateGeneration(masterTracker.getGeneration) + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - // failure should be cached - intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + // failure should be cached + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + } finally { + System.clearProperty("spark.master.port") + } } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 2165744689..2d177bbf67 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -86,9 +86,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("BlockManagerId object caching") { - val id1 = BlockManagerId("XXX", 1) - val id2 = BlockManagerId("XXX", 1) // this should return the same object as id1 - val id3 = BlockManagerId("XXX", 2) // this should return a different object + val id1 = BlockManagerId("e1", "XXX", 1) + val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1 + val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object assert(id2 === id1, "id2 is not same as id1") assert(id2.eq(id1), "id2 is not the same object as id1") assert(id3 != id1, "id3 is same as id1") @@ -103,7 +103,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 1 manager interaction") { - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -133,8 +133,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 2 managers interaction") { - store = new BlockManager(actorSystem, master, serializer, 2000) - store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000) + store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -149,7 +149,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing block") { - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -198,7 +198,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -206,7 +206,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") != None, "a1 was not in store") assert(master.getLocations("a1").size > 0, "master was not told about a1") - master.notifyADeadHost(store.blockManagerId.ip) + master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store invokePrivate heartBeat() @@ -214,14 +214,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.getLocations("a1").size > 0, "master was not told about a1") - master.notifyADeadHost(store.blockManagerId.ip) + master.removeExecutor(store.blockManagerId.executorId) assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) @@ -233,35 +233,35 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration doesn't dead lock") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) // try many times to trigger any deadlocks for (i <- 1 to 100) { - master.notifyADeadHost(store.blockManagerId.ip) + master.removeExecutor(store.blockManagerId.executorId) val t1 = new Thread { - override def run = { + override def run() { store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true) } } val t2 = new Thread { - override def run = { + override def run() { store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) } } val t3 = new Thread { - override def run = { + override def run() { store invokePrivate heartBeat() } } - t1.start - t2.start - t3.start - t1.join - t2.join - t3.join + t1.start() + t2.start() + t3.start() + t1.join() + t2.join() + t3.join() store.dropFromMemory("a1", null) store.dropFromMemory("a2", null) @@ -270,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -289,7 +289,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -308,14 +308,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) - // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 + // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 // from the same RDD assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") @@ -327,7 +327,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -350,7 +350,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -363,7 +363,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -378,7 +378,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -393,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -408,7 +408,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -423,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -448,7 +448,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -472,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager(actorSystem, master, serializer, 1200) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -518,7 +518,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager(actorSystem, master, serializer, 500) + store = new BlockManager("<driver>", actorSystem, master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -529,49 +529,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { System.setProperty("spark.shuffle.compress", "true") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") store.stop() store = null System.setProperty("spark.shuffle.compress", "false") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec2", actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "true") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec3", actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "false") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec4", actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "true") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec5", actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "false") - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec6", actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager(actorSystem, master, serializer, 2000) + store = new BlockManager("exec7", actorSystem, master, serializer, 2000) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop() |