aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-01-29 09:57:57 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-01-29 09:57:57 +0800
commit4670c99a21debda40a9def5b69fc30da4938d99c (patch)
tree70720ee52e00deceb89871cedde6e618e71f5158
parentaeef6f6b0837f6bc510c683e400acd6b0f98e228 (diff)
parentf03d9760fd8ac67fd0865cb355ba75d2eff507fe (diff)
downloadspark-4670c99a21debda40a9def5b69fc30da4938d99c.tar.gz
spark-4670c99a21debda40a9def5b69fc30da4938d99c.tar.bz2
spark-4670c99a21debda40a9def5b69fc30da4938d99c.zip
Merge branch 'mesos'
-rwxr-xr-xbin/start-master.sh3
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/spark/SparkContext.scala13
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/spark/Utils.scala17
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala16
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala3
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala44
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/MapStatus.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala110
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala18
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala78
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala38
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala10
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala27
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala12
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala66
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala119
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala3
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala8
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala13
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala4
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala5
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala69
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala86
-rwxr-xr-xsbt/sbt2
42 files changed, 460 insertions, 423 deletions
diff --git a/bin/start-master.sh b/bin/start-master.sh
index a901b1c260..87feb261fe 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -26,7 +26,8 @@ fi
# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
- if [[ `hostname` == *ec2.internal ]]; then
+ # NOTE: ec2-metadata is installed on Amazon Linux AMI. Check based on that and hostname
+ if command -v ec2-metadata > /dev/null || [[ `hostname` == *ec2.internal ]]; then
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index ac02f3363a..c1f012b419 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -114,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId) != null && array(mapId).address == bmAddress) {
+ if (array(mapId) != null && array(mapId).location == bmAddress) {
array(mapId) = null
}
}
@@ -277,7 +277,7 @@ private[spark] object MapOutputTracker {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
- (status.address, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, decompressSize(status.compressedSizes(reduceId)))
}
}
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 4581c0adcf..77036c1275 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap}
/**
@@ -80,6 +81,7 @@ class SparkContext(
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
+ "<driver>",
System.getProperty("spark.master.host"),
System.getProperty("spark.master.port").toInt,
true,
@@ -87,8 +89,9 @@ class SparkContext(
SparkEnv.set(env)
// Start the BlockManager UI
- spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem,
- SparkEnv.get.blockManager.master.masterActor, this)
+ private[spark] val ui = new BlockManagerUI(
+ env.actorSystem, env.blockManager.master.masterActor, this)
+ ui.start()
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
@@ -96,8 +99,7 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
-
- private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
+ private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
// Add each JAR given through the constructor
@@ -649,10 +651,9 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
+ /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
- var sizeBefore = persistentRdds.size
persistentRdds.clearOldValues(cleanupTime)
- logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size)
}
}
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 2a7a8af83d..0c094edcf3 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -19,6 +19,7 @@ import spark.util.AkkaUtils
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
+ val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
val closureSerializer: Serializer,
@@ -58,11 +59,12 @@ object SparkEnv extends Logging {
}
def createFromSystemProperties(
+ executorId: String,
hostname: String,
port: Int,
isMaster: Boolean,
- isLocal: Boolean
- ) : SparkEnv = {
+ isLocal: Boolean): SparkEnv = {
+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
@@ -86,7 +88,7 @@ object SparkEnv extends Logging {
val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(
actorSystem, isMaster, isLocal, masterIp, masterPort)
- val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
@@ -122,6 +124,7 @@ object SparkEnv extends Logging {
}
new SparkEnv(
+ executorId,
actorSystem,
serializer,
closureSerializer,
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ae77264372..1e58d01273 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,7 +1,7 @@
package spark
import java.io._
-import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI}
+import java.net._
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
import scala.io.Source
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.Some
/**
* Various utility methods used by Spark.
@@ -431,4 +432,18 @@ private object Utils extends Logging {
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
}
+
+ /**
+ * Try to find a free port to bind to on the local host. This should ideally never be needed,
+ * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
+ * don't let users bind to port 0 and then figure out which free port they actually bound to.
+ * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
+ * necessarily guaranteed to work, but it's the best we can do.
+ */
+ def findFreePort(): Int = {
+ val socket = new ServerSocket(0)
+ val portBound = socket.getLocalPort
+ socket.close()
+ portBound
+ }
}
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 4211d80596..8f51051e39 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -9,6 +9,12 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
private[spark]
class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
@@ -35,16 +41,12 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
/* Start the Slaves */
for (slaveNum <- 1 to numSlaves) {
- /* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
- All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
- sufficiently distinctive. */
- val slaveIpAddress = "127.100.0." + (slaveNum % 256)
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
slaveActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
- name = "Worker")
+ Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ name = "Worker")
slaveActors += actor
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 2c2cd0231b..2e7e868579 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -97,10 +97,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec)
// Only retry certain number of times so we don't go into an infinite loop.
- if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
schedule()
} else {
- val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ val e = new SparkException("Job %s with ID %s failed %d times.".format(
jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
logError(e.getMessage, e)
throw e
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a01774f511 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy._
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone master.
+ */
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 0d1fe2a6b4..f5ff267d44 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -65,9 +65,9 @@ private[spark] class ExecutorRunner(
}
}
- /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+ /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
- case "{{SLAVEID}}" => workerId
+ case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => hostname
case "{{CORES}}" => cores.toString
case other => other
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..ef81f072a3 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone worker.
+ */
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 28d9d40d43..bd21ba719a 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -30,7 +30,7 @@ private[spark] class Executor extends Logging {
initLogging()
- def initialize(slaveHostname: String, properties: Seq[(String, String)]) {
+ def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
@@ -64,7 +64,7 @@ private[spark] class Executor extends Logging {
)
// Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
+ env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index eeab3959c6..1ef88075ad 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -29,9 +29,10 @@ private[spark] class MesosExecutorBackend(executor: Executor)
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
+ logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(slaveInfo.getHostname, properties)
+ executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index a29bf974d2..50871802ea 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -8,16 +8,16 @@ import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterSlaveFailed
-import spark.scheduler.cluster.RegisterSlave
+import spark.scheduler.cluster.RegisterExecutorFailed
+import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
executor: Executor,
masterUrl: String,
- slaveId: String,
+ executorId: String,
hostname: String,
cores: Int)
extends Actor
@@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
try {
logInfo("Connecting to master: " + masterUrl)
master = context.actorFor(masterUrl)
- master ! RegisterSlave(slaveId, hostname, cores)
+ master ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend(
}
override def receive = {
- case RegisteredSlave(sparkProperties) =>
+ case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with master")
- executor.initialize(hostname, sparkProperties)
+ executor.initialize(executorId, hostname, sparkProperties)
- case RegisterSlaveFailed(message) =>
+ case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
@@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- master ! StatusUpdate(slaveId, taskId, state, data)
+ master ! StatusUpdate(executorId, taskId, state, data)
}
}
private[spark] object StandaloneExecutorBackend {
- def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
+ def run(masterUrl: String, executorId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(new Executor, masterUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
if (args.length != 4) {
- System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>")
+ System.err.println("Usage: StandaloneExecutorBackend <master> <executorId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index f599eb00bd..bd541d4207 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -35,9 +35,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
eventQueue.put(CompletionEvent(task, reason, result, accumUpdates))
}
- // Called by TaskScheduler when a host fails.
- override def hostLost(host: String) {
- eventQueue.put(HostLost(host))
+ // Called by TaskScheduler when an executor fails.
+ override def executorLost(execId: String) {
+ eventQueue.put(ExecutorLost(execId))
}
// Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
@@ -72,7 +72,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// For tracking failed nodes, we use the MapOutputTracker's generation number, which is
// sent with every task. When we detect a node failing, we note the current generation number
- // and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
+ // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
// results.
// TODO: Garbage collect information about failure generations when we know there are no more
// stray messages to detect.
@@ -108,7 +108,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
def clearCacheLocs() {
- cacheLocs.clear
+ cacheLocs.clear()
}
/**
@@ -271,8 +271,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
submitStage(finalStage)
}
- case HostLost(host) =>
- handleHostLost(host)
+ case ExecutorLost(execId) =>
+ handleExecutorLost(execId)
case completion: CompletionEvent =>
handleTaskCompletion(completion)
@@ -436,10 +436,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case smt: ShuffleMapTask =>
val stage = idToStage(smt.stageId)
val status = event.result.asInstanceOf[MapStatus]
- val host = status.address.ip
- logInfo("ShuffleMapTask finished with host " + host)
- if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
- logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
+ val execId = status.location.executorId
+ logDebug("ShuffleMapTask finished on " + execId)
+ if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
+ logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partition, status)
}
@@ -511,9 +511,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
- // TODO: mark the host as failed only if there were lots of fetch failures on it
+ // TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleHostLost(bmAddress.ip, Some(task.generation))
+ handleExecutorLost(bmAddress.executorId, Some(task.generation))
}
case other =>
@@ -523,21 +523,21 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
/**
- * Responds to a host being lost. This is called inside the event loop so it assumes that it can
- * modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
+ * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
+ * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* Optionally the generation during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
+ def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
- if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
- failedGeneration(host) = currentGeneration
- logInfo("Host lost: " + host + " (generation " + currentGeneration + ")")
- env.blockManager.master.notifyADeadHost(host)
+ if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
+ failedGeneration(execId) = currentGeneration
+ logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
+ env.blockManager.master.removeExecutor(execId)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
- stage.removeOutputsOnHost(host)
+ stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
}
@@ -546,7 +546,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
clearCacheLocs()
} else {
- logDebug("Additional host lost message for " + host +
+ logDebug("Additional executor lost message for " + execId +
"(generation " + currentGeneration + ")")
}
}
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 3422a21d9d..b34fa78c07 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -28,7 +28,7 @@ private[spark] case class CompletionEvent(
accumUpdates: Map[Long, Any])
extends DAGSchedulerEvent
-private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
+private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
index fae643f3a8..203abb917b 100644
--- a/core/src/main/scala/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/spark/scheduler/MapStatus.scala
@@ -8,19 +8,19 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable}
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte])
+private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
extends Externalizable {
def this() = this(null, null) // For deserialization only
def writeExternal(out: ObjectOutput) {
- address.writeExternal(out)
+ location.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
def readExternal(in: ObjectInput) {
- address = BlockManagerId(in)
+ location = BlockManagerId(in)
compressedSizes = new Array[Byte](in.readInt())
in.readFully(compressedSizes)
}
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 4846b66729..e9419728e3 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -51,18 +51,18 @@ private[spark] class Stage(
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.address == bmAddress)
+ val newList = prevList.filterNot(_.location == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
numAvailableOutputs -= 1
}
}
- def removeOutputsOnHost(host: String) {
+ def removeOutputsOnExecutor(execId: String) {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.address.ip == host)
+ val newList = prevList.filterNot(_.location.executorId == execId)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
@@ -70,7 +70,8 @@ private[spark] class Stage(
}
}
if (becameUnavailable) {
- logInfo("%s is now unavailable on %s (%d/%d, %s)".format(this, host, numAvailableOutputs, numPartitions, isAvailable))
+ logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+ this, execId, numAvailableOutputs, numPartitions, isAvailable))
}
}
@@ -82,7 +83,7 @@ private[spark] class Stage(
def origin: String = rdd.origin
- override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]"
+ override def toString = "Stage " + id
override def hashCode(): Int = id
}
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index fa4de15d0d..9fcef86e46 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -12,7 +12,7 @@ private[spark] trait TaskSchedulerListener {
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit
// A node was lost from the cluster.
- def hostLost(host: String): Unit
+ def executorLost(execId: String): Unit
// The TaskScheduler wants to abort an entire task set.
def taskSetFailed(taskSet: TaskSet, reason: String): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index a639b72795..0b4177805b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
- val taskIdToSlaveId = new HashMap[Long, String]
+ val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
- // Which hosts in the cluster are alive (contains hostnames)
- val hostsAlive = new HashSet[String]
+ // Which executor IDs we have executors on
+ val activeExecutorIds = new HashSet[String]
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
+ // The set of executors we have on each host; this is used to compute hostsAlive, which
+ // in turn is used to decide when we can attain data locality on a given host
+ val executorsByHost = new HashMap[String, HashSet[String]]
- val slaveIdToHost = new HashMap[String, String]
+ val executorIdToHost = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets -= manager.taskSet.id
activeTaskSetsQueue -= manager
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id)
+ taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id)
}
}
@@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
- slaveIdToHost(o.slaveId) = o.hostname
- hostsAlive += o.hostname
+ executorIdToHost(o.executorId) = o.hostname
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
@@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
do {
launchedTask = false
for (i <- 0 until offers.size) {
- val sid = offers(i).slaveId
+ val execId = offers(i).executorId
val host = offers(i).hostname
- manager.slaveOffer(sid, host, availableCpus(i)) match {
+ manager.slaveOffer(execId, host, availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToSlaveId(tid) = sid
- slaveIdsWithExecutors += sid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ if (!executorsByHost.contains(host)) {
+ executorsByHost(host) = new HashSet()
+ }
+ executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true
@@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var taskSetToUpdate: Option[TaskSetManager] = None
- var failedHost: Option[String] = None
+ var failedExecutor: Option[String] = None
var taskFailed = false
synchronized {
try {
- if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- val slaveId = taskIdToSlaveId(tid)
- val host = slaveIdToHost(slaveId)
- if (hostsAlive.contains(host)) {
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
+ if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+ // We lost this entire executor, so remember that it's gone
+ val execId = taskIdToExecutorId(tid)
+ if (activeExecutorIds.contains(execId)) {
+ removeExecutor(execId)
+ failedExecutor = Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
if (activeTaskSets.contains(taskSetId)) {
- //activeTaskSets(taskSetId).statusUpdate(status)
taskSetToUpdate = Some(activeTaskSets(taskSetId))
}
if (TaskState.isFinished(state)) {
@@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (taskSetTaskIds.contains(taskSetId)) {
taskSetTaskIds(taskSetId) -= tid
}
- taskIdToSlaveId.remove(tid)
+ taskIdToExecutorId.remove(tid)
}
if (state == TaskState.FAILED) {
taskFailed = true
@@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case e: Exception => logError("Exception in statusUpdate", e)
}
}
- // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
+ // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
if (taskSetToUpdate != None) {
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@@ -249,32 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def slaveLost(slaveId: String, reason: ExecutorLossReason) {
- var failedHost: Option[String] = None
+ def executorLost(executorId: String, reason: ExecutorLossReason) {
+ var failedExecutor: Option[String] = None
synchronized {
- slaveIdToHost.get(slaveId) match {
- case Some(host) =>
- if (hostsAlive.contains(host)) {
- logError("Lost an executor on " + host + ": " + reason)
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
- } else {
- // We may get multiple slaveLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor on " + host + " (already removed): " + reason)
- }
- case None =>
- // We were told about a slave being lost before we could even allocate work to it
- logError("Lost slave " + slaveId + " (no work assigned yet)")
+ if (activeExecutorIds.contains(executorId)) {
+ val host = executorIdToHost(executorId)
+ logError("Lost executor %s on %s: %s".format(executorId, host, reason))
+ removeExecutor(executorId)
+ failedExecutor = Some(executorId)
+ } else {
+ // We may get multiple executorLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
}
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ // Call listener.executorLost without holding the lock on this to prevent deadlock
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
+
+ /** Get a list of hosts that currently have executors */
+ def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet
+
+ /** Remove an executor from all our data structures and mark it as lost */
+ private def removeExecutor(executorId: String) {
+ activeExecutorIds -= executorId
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
+ execs -= executorId
+ if (execs.isEmpty) {
+ executorsByHost -= host
+ }
+ executorIdToHost -= executorId
+ activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
deleted file mode 100644
index 96ebaa4601..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package spark.scheduler.cluster
-
-private[spark]
-class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 4f82cd96dd..6dd3ae003d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@@ -37,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
val masterUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
- val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(masterUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
@@ -47,7 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
}
override def stop() {
- stopping = true;
+ stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
@@ -67,23 +66,16 @@ private[spark] class SparkDeploySchedulerBackend(
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
- executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
+ def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
}
- logInfo("Executor %s removed: %s".format(id, message))
- executorIdToSlaveId.get(id) match {
- case Some(slaveId) =>
- executorIdToSlaveId.remove(id)
- scheduler.slaveLost(slaveId, reason)
- case None =>
- logInfo("No slave ID known for executor %s".format(id))
- }
+ logInfo("Executor %s removed: %s".format(executorId, message))
+ scheduler.executorLost(executorId, reason)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 1386cd9d44..c68f15bdfa 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -11,24 +11,26 @@ private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
-case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
private[spark]
-case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-// Slaves to master
+// Executors to master
private[spark]
-case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class RegisterExecutor(executorId: String, host: String, cores: Int)
+ extends StandaloneClusterMessage
private[spark]
-case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index eeaae23dc8..69822f568c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -24,12 +24,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
var totalCoreCount = new AtomicInteger(0)
class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
- val slaveActor = new HashMap[String, ActorRef]
- val slaveAddress = new HashMap[String, Address]
- val slaveHost = new HashMap[String, String]
+ val executorActor = new HashMap[String, ActorRef]
+ val executorAddress = new HashMap[String, Address]
+ val executorHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
- val actorToSlaveId = new HashMap[ActorRef, String]
- val addressToSlaveId = new HashMap[Address, String]
+ val actorToExecutorId = new HashMap[ActorRef, String]
+ val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -37,28 +37,28 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
def receive = {
- case RegisterSlave(slaveId, host, cores) =>
- if (slaveActor.contains(slaveId)) {
- sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId)
+ case RegisterExecutor(executorId, host, cores) =>
+ if (executorActor.contains(executorId)) {
+ sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
- logInfo("Registered slave: " + sender + " with ID " + slaveId)
- sender ! RegisteredSlave(sparkProperties)
+ logInfo("Registered executor: " + sender + " with ID " + executorId)
+ sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
- slaveActor(slaveId) = sender
- slaveHost(slaveId) = host
- freeCores(slaveId) = cores
- slaveAddress(slaveId) = sender.path.address
- actorToSlaveId(sender) = slaveId
- addressToSlaveId(sender.path.address) = slaveId
+ executorActor(executorId) = sender
+ executorHost(executorId) = host
+ freeCores(executorId) = cores
+ executorAddress(executorId) = sender.path.address
+ actorToExecutorId(sender) = executorId
+ addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
}
- case StatusUpdate(slaveId, taskId, state, data) =>
+ case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
- freeCores(slaveId) += 1
- makeOffers(slaveId)
+ freeCores(executorId) += 1
+ makeOffers(executorId)
}
case ReviveOffers =>
@@ -69,47 +69,47 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
- actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
+ actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
- // Make fake resource offers on all slaves
+ // Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
- // Make fake resource offers on just one slave
- def makeOffers(slaveId: String) {
+ // Make fake resource offers on just one executor
+ def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))))
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.slaveId) -= 1
- slaveActor(task.slaveId) ! LaunchTask(task)
+ freeCores(task.executorId) -= 1
+ executorActor(task.executorId) ! LaunchTask(task)
}
}
// Remove a disconnected slave from the cluster
- def removeSlave(slaveId: String, reason: String) {
- logInfo("Slave " + slaveId + " disconnected, so removing it")
- val numCores = freeCores(slaveId)
- actorToSlaveId -= slaveActor(slaveId)
- addressToSlaveId -= slaveAddress(slaveId)
- slaveActor -= slaveId
- slaveHost -= slaveId
- freeCores -= slaveId
- slaveHost -= slaveId
+ def removeExecutor(executorId: String, reason: String) {
+ logInfo("Slave " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ executorHost -= executorId
totalCoreCount.addAndGet(-numCores)
- scheduler.slaveLost(slaveId, SlaveLost(reason))
+ scheduler.executorLost(executorId, SlaveLost(reason))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index aa097fd3a2..b41e951be9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,7 +5,7 @@ import spark.util.SerializableBuffer
private[spark] class TaskDescription(
val taskId: Long,
- val slaveId: String,
+ val executorId: String,
val name: String,
_serializedTask: ByteBuffer)
extends Serializable {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index ca84503780..0f975ce1eb 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -4,7 +4,12 @@ package spark.scheduler.cluster
* Information about a running task attempt inside a TaskSet.
*/
private[spark]
-class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
+class TaskInfo(
+ val taskId: Long,
+ val index: Int,
+ val launchTime: Long,
+ val executorId: String,
+ val host: String) {
var finishTime: Long = 0
var failed = false
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index a089b71644..26201ad0dd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -138,10 +138,11 @@ private[spark] class TaskSetManager(
// attempt running on this host, in case the host is slow. In addition, if localOnly is set, the
// task must have a preference for this host (or no preferred locations at all).
def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = {
+ val hostsAlive = sched.hostsAlive
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
val localTask = speculatableTasks.find {
index =>
- val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
+ val locations = tasks(index).preferredLocations.toSet & hostsAlive
val attemptLocs = taskAttempts(index).map(_.host)
(locations.size == 0 || locations.contains(host)) && !attemptLocs.contains(host)
}
@@ -189,7 +190,7 @@ private[spark] class TaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -206,11 +207,11 @@ private[spark] class TaskSetManager(
} else {
"non-preferred, not one of " + task.preferredLocations.mkString(", ")
}
- logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
- taskSet.id, index, taskId, slaveId, host, prefStr))
+ logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
+ taskSet.id, index, taskId, execId, host, prefStr))
// Do various bookkeeping
copiesRunning(index) += 1
- val info = new TaskInfo(taskId, index, time, host)
+ val info = new TaskInfo(taskId, index, time, execId, host)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
if (preferred) {
@@ -224,7 +225,7 @@ private[spark] class TaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
- return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
@@ -356,19 +357,22 @@ private[spark] class TaskSetManager(
sched.taskSetFinished(this)
}
- def hostLost(hostname: String) {
- logInfo("Re-queueing tasks for " + hostname + " from TaskSet " + taskSet.id)
- // If some task has preferred locations only on hostname, put it in the no-prefs list
- // to avoid the wait from delay scheduling
- for (index <- getPendingTasksForHost(hostname)) {
- val newLocs = tasks(index).preferredLocations.toSet & sched.hostsAlive
- if (newLocs.isEmpty) {
- pendingTasksWithNoPrefs += index
+ def executorLost(execId: String, hostname: String) {
+ logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
+ val newHostsAlive = sched.hostsAlive
+ // If some task has preferred locations only on hostname, and there are no more executors there,
+ // put it in the no-prefs list to avoid the wait from delay scheduling
+ if (!newHostsAlive.contains(hostname)) {
+ for (index <- getPendingTasksForHost(hostname)) {
+ val newLocs = tasks(index).preferredLocations.toSet & newHostsAlive
+ if (newLocs.isEmpty) {
+ pendingTasksWithNoPrefs += index
+ }
}
}
- // Re-enqueue any tasks that ran on the failed host if this is a shuffle map stage
+ // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
- for ((tid, info) <- taskInfos if info.host == hostname) {
+ for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (finished(index)) {
finished(index) = false
@@ -382,7 +386,7 @@ private[spark] class TaskSetManager(
}
}
// Also re-enqueue any tasks that were running on the node
- for ((tid, info) <- taskInfos if info.running && info.host == hostname) {
+ for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
taskLost(tid, TaskState.KILLED, null)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 6b919d68b2..3c3afcbb14 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -1,8 +1,8 @@
package spark.scheduler.cluster
/**
- * Represents free resources available on a worker node.
+ * Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
+class WorkerOffer(val executorId: String, val hostname: String, val cores: Int) {
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 2989e31f5e..f3467db86b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -268,7 +268,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.getValue, reason)
+ scheduler.executorLost(slaveId.getValue, reason)
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 19d35b8667..1215d5f5c8 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -30,6 +30,7 @@ extends Exception(message)
private[spark]
class BlockManager(
+ executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
val serializer: Serializer,
@@ -68,8 +69,8 @@ class BlockManager(
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
- val connectionManagerId = connectionManager.id
- val blockManagerId = BlockManagerId(connectionManagerId.host, connectionManagerId.port)
+ val blockManagerId = BlockManagerId(
+ executorId, connectionManager.id.host, connectionManager.id.port)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@@ -109,8 +110,9 @@ class BlockManager(
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
- this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
+ serializer: Serializer) = {
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index abb8b45a1f..f2f1e77d41 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -7,27 +7,32 @@ import java.util.concurrent.ConcurrentHashMap
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the factory method in
- * [[spark.storage.BlockManager$]]. This allows de-duplication of id objects.
+ * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
+ private var executorId_ : String,
private var ip_ : String,
private var port_ : Int
) extends Externalizable {
- private def this() = this(null, 0) // For deserialization only
+ private def this() = this(null, null, 0) // For deserialization only
- def ip = ip_
+ def executorId: String = executorId_
- def port = port_
+ def ip: String = ip_
+
+ def port: Int = port_
override def writeExternal(out: ObjectOutput) {
+ out.writeUTF(executorId_)
out.writeUTF(ip_)
out.writeInt(port_)
}
override def readExternal(in: ObjectInput) {
+ executorId_ = in.readUTF()
ip_ = in.readUTF()
port_ = in.readInt()
}
@@ -35,21 +40,23 @@ private[spark] class BlockManagerId private (
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+ override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, ip, port)
- override def hashCode = ip.hashCode * 41 + port
+ override def hashCode: Int = (executorId.hashCode * 41 + ip.hashCode) * 41 + port
override def equals(that: Any) = that match {
- case id: BlockManagerId => port == id.port && ip == id.ip
- case _ => false
+ case id: BlockManagerId =>
+ executorId == id.executorId && port == id.port && ip == id.ip
+ case _ =>
+ false
}
}
private[spark] object BlockManagerId {
- def apply(ip: String, port: Int) =
- getCachedBlockManagerId(new BlockManagerId(ip, port))
+ def apply(execId: String, ip: String, port: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, ip, port))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 937115e92c..55ff1dde9c 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -24,7 +24,7 @@ private[spark] class BlockManagerMaster(
masterPort: Int)
extends Logging {
- val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
@@ -45,10 +45,10 @@ private[spark] class BlockManagerMaster(
}
}
- /** Remove a dead host from the master actor. This is only called on the master side. */
- def notifyADeadHost(host: String) {
- tell(RemoveHost(host))
- logInfo("Removed " + host + " successfully in notifyADeadHost")
+ /** Remove a dead executor from the master actor. This is only called on the master side. */
+ def removeExecutor(execId: String) {
+ tell(RemoveExecutor(execId))
+ logInfo("Removed " + execId + " successfully in removeExecutor")
}
/**
@@ -146,7 +146,7 @@ private[spark] class BlockManagerMaster(
}
var attempts = 0
var lastException: Exception = null
- while (attempts < AKKA_RETRY_ATTEMPS) {
+ while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = masterActor.ask(message)(timeout)
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index b31b6286d3..f88517f1a3 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -23,9 +23,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private val blockManagerInfo =
new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
- // Mapping from host name to block manager id. We allow multiple block managers
- // on the same host name (ip).
- private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+ // Mapping from executor ID to block manager ID.
+ private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
@@ -74,8 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
case RemoveBlock(blockId) =>
removeBlock(blockId)
- case RemoveHost(host) =>
- removeHost(host)
+ case RemoveExecutor(execId) =>
+ removeExecutor(execId)
sender ! true
case StopBlockManagerMaster =>
@@ -99,16 +98,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
- // Remove the block manager from blockManagerIdByHost. If the list of block
- // managers belonging to the IP is empty, remove the entry from the hash map.
- blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
- managers -= blockManagerId
- if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
- }
+ // Remove the block manager from blockManagerIdByExecutor.
+ blockManagerIdByExecutor -= blockManagerId.executorId
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
- var iterator = info.blocks.keySet.iterator
+ val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)._2
@@ -133,17 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
toRemove.foreach(removeBlockManager)
}
- def removeHost(host: String) {
- logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
- logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
- logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+ def removeExecutor(execId: String) {
+ logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
+ blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
sender ! true
}
def heartBeat(blockManagerId: BlockManagerId) {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ if (blockManagerId.executorId == "<driver>" && !isLocal) {
sender ! true
} else {
sender ! false
@@ -188,24 +181,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! res
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " "
-
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- logInfo("Got Register Msg from master node, don't register it")
- } else if (!blockManagerInfo.contains(blockManagerId)) {
- blockManagerIdByHost.get(blockManagerId.ip) match {
- case Some(managers) =>
- // A block manager of the same host name already exists.
- logInfo("Got another registration for host " + blockManagerId)
- managers += blockManagerId
+ private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ if (id.executorId == "<driver>" && !isLocal) {
+ // Got a register message from the master node; don't register it
+ } else if (!blockManagerInfo.contains(id)) {
+ blockManagerIdByExecutor.get(id.executorId) match {
+ case Some(manager) =>
+ // A block manager of the same host name already exists
+ logError("Got two different block manager registrations on " + id.executorId)
+ System.exit(1)
case None =>
- blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+ blockManagerIdByExecutor(id.executorId) = id
}
-
- blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
- blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+ blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
+ id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
sender ! true
}
@@ -217,11 +206,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
memSize: Long,
diskSize: Long) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " " + blockId + " "
-
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ if (blockManagerId.executorId == "<driver>" && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
@@ -353,8 +339,8 @@ object BlockManagerMasterActor {
_lastSeenMs = System.currentTimeMillis()
}
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long,
+ diskSize: Long) {
updateLastSeenMs()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 3d03ff3a93..1494f90103 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
+case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 1003cc7a61..eda320fa47 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,29 +1,41 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
-import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.Directives
import scala.collection.mutable.ArrayBuffer
-import spark.{Logging, SparkContext, SparkEnv}
+import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
+import spark.Utils
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
private[spark]
-object BlockManagerUI extends Logging {
+class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
+ extends Directives with Logging {
+
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+ implicit val timeout = Timeout(10 seconds)
- /* Starts the Web interface for the BlockManager */
- def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
- val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
+ /** Start a HTTP server to run the Web interface */
+ def start() {
try {
- logInfo("Starting BlockManager WebUI.")
- val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt
- AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port,
- webUIDirectives.handler, "BlockManagerHTTPServer")
+ val port = if (System.getProperty("spark.ui.port") != null) {
+ System.getProperty("spark.ui.port").toInt
+ } else {
+ // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
+ // random port it bound to, so we have to try to find a local one by creating a socket.
+ Utils.findFreePort()
+ }
+ AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+ logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
} catch {
case e: Exception =>
logError("Failed to create BlockManager WebUI", e)
@@ -31,58 +43,43 @@ object BlockManagerUI extends Logging {
}
}
-}
-
-
-private[spark]
-class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef,
- sc: SparkContext) extends Directives {
-
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
-
val handler = {
-
- get { path("") { completeWith {
- // Request the current storage status from the Master
- val future = master ? GetStorageStatus
- future.map { status =>
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
-
- // Calculate macro-level statistics
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
-
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
-
- spark.storage.html.index.
- render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
- }
- }}} ~
- get { path("rdd") { parameter("id") { id => { completeWith {
- val future = master ? GetStorageStatus
- future.map { status =>
- val prefix = "rdd_" + id.toString
-
-
- val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
-
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
-
+ get {
+ path("") {
+ completeWith {
+ // Request the current storage status from the Master
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ // Calculate macro-level statistics
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
+ }
+ }
+ } ~
+ path("rdd") {
+ parameter("id") { id =>
+ completeWith {
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
+ }
+ }
+ }
+ } ~
+ pathPrefix("static") {
+ getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
- }}}}} ~
- pathPrefix("static") {
- getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
-
}
-
-
-
}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 689f07b969..f04c046c31 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -78,7 +78,8 @@ private[spark] object ThreadingTest {
val masterIp: String = System.getProperty("spark.master.host", "localhost")
val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
- val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
+ val blockManager = new BlockManager(
+ "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index ff2c3079be..e0fdeffbc4 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,6 +1,6 @@
package spark.util
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.util.duration._
import akka.pattern.ask
@@ -52,10 +52,10 @@ private[spark] object AkkaUtils {
/**
* Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
- * handle requests. Throws a SparkException if this fails.
+ * handle requests. Returns the bound port or throws a SparkException on failure.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route,
- name: String = "HttpServer") {
+ name: String = "HttpServer"): ActorRef = {
val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
@@ -67,7 +67,7 @@ private[spark] object AkkaUtils {
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>
- return
+ return server
case other: Any =>
throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 139e21d09e..51fb440108 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
import spark.Logging
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = MetadataCleaner.getDelaySeconds
@@ -14,18 +17,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
val task = new TimerTask {
def run() {
try {
- if (delaySeconds > 0) {
- cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran metadata cleaner for " + name)
- }
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
- if (periodSeconds > 0) {
- logInfo(
+ if (delaySeconds > 0) {
+ logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index bb7c5c01c8..188f8910da 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -63,9 +63,9 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
- override def size(): Int = internalMap.size()
+ override def size: Int = internalMap.size
- override def foreach[U](f: ((A, B)) => U): Unit = {
+ override def foreach[U](f: ((A, B)) => U) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 70a7c8bc2f..342610e1dd 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -13,7 +13,8 @@ class DriverSuite extends FunSuite with Timeouts {
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
failAfter(10 seconds) {
- Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME")))
+ Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
+ new File(System.getenv("SPARK_HOME")))
}
}
}
@@ -28,4 +29,4 @@ object DriverWithoutCleanup {
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
sc.parallelize(1 to 100, 4).count()
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 7d5305f1e0..e8fe7ecabc 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -43,13 +43,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
- assert(statuses.toSeq === Seq((BlockManagerId("hostA", 1000), size1000),
- (BlockManagerId("hostB", 1000), size10000)))
+ assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
+ (BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
}
@@ -61,47 +61,52 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
- tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
- tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
- // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // The remaining reduce task might try to grab the output despite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
- val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("test", "localhost", 0)
- System.setProperty("spark.master.port", boundPort.toString)
- val masterTracker = new MapOutputTracker(actorSystem, true)
- val slaveTracker = new MapOutputTracker(actorSystem, false)
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ try {
+ System.clearProperty("spark.master.host") // In case some previous test had set it
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("test", "localhost", 0)
+ System.setProperty("spark.master.port", boundPort.toString)
+ val masterTracker = new MapOutputTracker(actorSystem, true)
+ val slaveTracker = new MapOutputTracker(actorSystem, false)
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("hostA", 1000), Array(compressedSize1000)))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("hostA", 1000), size1000)))
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ masterTracker.registerMapOutput(10, 0, new MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
- masterTracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- // failure should be cached
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ // failure should be cached
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ } finally {
+ System.clearProperty("spark.master.port")
+ }
}
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 2165744689..2d177bbf67 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -86,9 +86,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("BlockManagerId object caching") {
- val id1 = BlockManagerId("XXX", 1)
- val id2 = BlockManagerId("XXX", 1) // this should return the same object as id1
- val id3 = BlockManagerId("XXX", 2) // this should return a different object
+ val id1 = BlockManagerId("e1", "XXX", 1)
+ val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
+ val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")
@@ -103,7 +103,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -133,8 +133,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
- store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -149,7 +149,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing block") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -198,7 +198,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -206,7 +206,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
- master.notifyADeadHost(store.blockManagerId.ip)
+ master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
@@ -214,14 +214,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")
- master.notifyADeadHost(store.blockManagerId.ip)
+ master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
@@ -233,35 +233,35 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
// try many times to trigger any deadlocks
for (i <- 1 to 100) {
- master.notifyADeadHost(store.blockManagerId.ip)
+ master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
- override def run = {
+ override def run() {
store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
}
}
val t2 = new Thread {
- override def run = {
+ override def run() {
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
}
}
val t3 = new Thread {
- override def run = {
+ override def run() {
store invokePrivate heartBeat()
}
}
- t1.start
- t2.start
- t3.start
- t1.join
- t2.join
- t3.join
+ t1.start()
+ t2.start()
+ t3.start()
+ t1.join()
+ t2.join()
+ t3.join()
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
@@ -270,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -289,7 +289,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -308,14 +308,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
- // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
@@ -327,7 +327,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -350,7 +350,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -363,7 +363,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -378,7 +378,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -393,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -408,7 +408,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -423,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -448,7 +448,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -472,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -518,7 +518,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(actorSystem, master, serializer, 500)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -529,49 +529,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
diff --git a/sbt/sbt b/sbt/sbt
index a3055c13c1..8f426d18e8 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"