aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-01-30 18:52:35 -0800
committerImran Rashid <imran@quantifind.com>2013-01-30 18:52:35 -0800
commit02a6761589c35f15f1a6e3b63a7964ba057d3ba6 (patch)
tree5349b811fb721d440c77265b3c13061bcdca5982 /core
parentc1df24d0850b0ac89f35f1a47ce6b2fb5b95df0a (diff)
parent55327a283e962652a126d3f8ac7e9a19c76f1f19 (diff)
downloadspark-02a6761589c35f15f1a6e3b63a7964ba057d3ba6.tar.gz
spark-02a6761589c35f15f1a6e3b63a7964ba057d3ba6.tar.bz2
spark-02a6761589c35f15f1a6e3b63a7964ba057d3ba6.zip
Merge branch 'master' into blockmanager_info
Conflicts: core/src/main/scala/spark/storage/BlockManagerMaster.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala10
-rw-r--r--core/src/main/scala/spark/RDD.scala16
-rw-r--r--core/src/main/scala/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala22
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala5
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala24
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/BroadcastFactory.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala35
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala52
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala27
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala24
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala14
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala6
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala24
-rw-r--r--core/src/main/scala/spark/network/Connection.scala15
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala30
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala69
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala6
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala6
30 files changed, 264 insertions, 227 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index c1f012b419..aaf433b324 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -38,10 +38,7 @@ private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Ac
}
}
-private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
- val ip: String = System.getProperty("spark.master.host", "localhost")
- val port: Int = System.getProperty("spark.master.port", "7077").toInt
- val actorName: String = "MapOutputTracker"
+private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolean) extends Logging {
val timeout = 10.seconds
@@ -56,11 +53,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var cacheGeneration = generation
val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
- var trackerActor: ActorRef = if (isMaster) {
+ val actorName: String = "MapOutputTracker"
+ var trackerActor: ActorRef = if (isDriver) {
val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index dbad6d4c83..210404d540 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -654,4 +654,20 @@ abstract class RDD[T: ClassManifest](
protected def clearDependencies() {
dependencies_ = null
}
+
+ /** A description of this RDD and its recursive dependencies for debugging. */
+ def toDebugString(): String = {
+ def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
+ Seq(prefix + rdd + " (" + rdd.splits.size + " splits)") ++
+ rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
+ }
+ debugString(this).mkString("\n")
+ }
+
+ override def toString(): String = "%s%s[%d] at %s".format(
+ Option(name).map(_ + " ").getOrElse(""),
+ getClass.getSimpleName,
+ id,
+ origin)
+
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 39e3555de8..d0eefd5229 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -70,12 +70,12 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()
- // Set Spark master host and port system properties
- if (System.getProperty("spark.master.host") == null) {
- System.setProperty("spark.master.host", Utils.localIpAddress)
+ // Set Spark driver host and port system properties
+ if (System.getProperty("spark.driver.host") == null) {
+ System.setProperty("spark.driver.host", Utils.localIpAddress)
}
- if (System.getProperty("spark.master.port") == null) {
- System.setProperty("spark.master.port", "0")
+ if (System.getProperty("spark.driver.port") == null) {
+ System.setProperty("spark.driver.port", "0")
}
private val isLocal = (master == "local" || master.startsWith("local["))
@@ -83,15 +83,15 @@ class SparkContext(
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
- System.getProperty("spark.master.host"),
- System.getProperty("spark.master.port").toInt,
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port").toInt,
true,
isLocal)
SparkEnv.set(env)
// Start the BlockManager UI
private[spark] val ui = new BlockManagerUI(
- env.actorSystem, env.blockManager.master.masterActor, this)
+ env.actorSystem, env.blockManager.master.driverActor, this)
ui.start()
// Used to store a URL for each static file/jar together with the file's local timestamp
@@ -411,14 +411,14 @@ class SparkContext(
/**
* Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `+=` method. Only the master can access the accumulator's `value`.
+ * to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
/**
* Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
- * Only the master can access the accumuable's `value`.
+ * Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
@@ -561,7 +561,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
- * whether the scheduler can run the computation on the master rather than shipping it out to the
+ * whether the scheduler can run the computation on the driver rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 0c094edcf3..d2193ae72b 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -62,15 +62,15 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
- isMaster: Boolean,
+ isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
- // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
- // figure out which port number Akka actually bound to and set spark.master.port to it.
- if (isMaster && port == 0) {
- System.setProperty("spark.master.port", boundPort.toString)
+ // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
+ // figure out which port number Akka actually bound to and set spark.driver.port to it.
+ if (isDriver && port == 0) {
+ System.setProperty("spark.driver.port", boundPort.toString)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -84,22 +84,22 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
- val masterIp: String = System.getProperty("spark.master.host", "localhost")
- val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val driverIp: String = System.getProperty("spark.driver.host", "localhost")
+ val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(
- actorSystem, isMaster, isLocal, masterIp, masterPort)
+ actorSystem, isDriver, isLocal, driverIp, driverPort)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isMaster)
+ val broadcastManager = new BroadcastManager(isDriver)
val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "spark.JavaSerializer")
val cacheManager = new CacheManager(blockManager)
- val mapOutputTracker = new MapOutputTracker(actorSystem, isMaster)
+ val mapOutputTracker = new MapOutputTracker(actorSystem, isDriver)
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
@@ -111,7 +111,7 @@ object SparkEnv extends Logging {
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
- val sparkFilesDir: String = if (isMaster) {
+ val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir().getAbsolutePath
} else {
"."
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 46fd8fe85e..60025b459c 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -330,4 +330,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
case _ => Optional.absent()
}
}
+
+ /** A description of this RDD and its recursive dependencies for debugging. */
+ def toDebugString(): String = {
+ rdd.toDebugString()
+ }
}
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 386f505f2a..adcb2d2415 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -31,7 +31,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
@transient var totalBlocks = -1
@transient var hasBlocks = new AtomicInteger(0)
- // Used ONLY by Master to track how many unique blocks have been sent out
+ // Used ONLY by driver to track how many unique blocks have been sent out
@transient var sentBlocks = new AtomicInteger(0)
@transient var listenPortLock = new Object
@@ -42,7 +42,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
@transient var serveMR: ServeMultipleRequests = null
- // Used only in Master
+ // Used only in driver
@transient var guideMR: GuideMultipleRequests = null
// Used only in Workers
@@ -99,14 +99,14 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
}
// Must always come AFTER listenPort is created
- val masterSource =
+ val driverSource =
SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes)
hasBlocksBitVector.synchronized {
- masterSource.hasBlocksBitVector = hasBlocksBitVector
+ driverSource.hasBlocksBitVector = hasBlocksBitVector
}
// In the beginning, this is the only known source to Guide
- listOfSources += masterSource
+ listOfSources += driverSource
// Register with the Tracker
MultiTracker.registerBroadcast(id,
@@ -122,7 +122,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
case None =>
logInfo("Started reading broadcast variable " + id)
- // Initializing everything because Master will only send null/0 values
+ // Initializing everything because driver will only send null/0 values
// Only the 1st worker in a node can be here. Others will get from cache
initializeWorkerVariables()
@@ -151,7 +151,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
}
}
- // Initialize variables in the worker node. Master sends everything as 0/null
+ // Initialize variables in the worker node. Driver sends everything as 0/null
private def initializeWorkerVariables() {
arrayOfBlocks = null
hasBlocksBitVector = null
@@ -248,7 +248,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Receive source information from Guide
var suitableSources =
oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]]
- logDebug("Received suitableSources from Master " + suitableSources)
+ logDebug("Received suitableSources from Driver " + suitableSources)
addToListOfSources(suitableSources)
@@ -532,7 +532,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
oosSource.writeObject(blockToAskFor)
oosSource.flush()
- // CHANGED: Master might send some other block than the one
+ // CHANGED: Driver might send some other block than the one
// requested to ensure fast spreading of all blocks.
val recvStartTime = System.currentTimeMillis
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
@@ -982,9 +982,9 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Receive which block to send
var blockToSend = ois.readObject.asInstanceOf[Int]
- // If it is master AND at least one copy of each block has not been
+ // If it is driver AND at least one copy of each block has not been
// sent out already, MODIFY blockToSend
- if (MultiTracker.isMaster && sentBlocks.get < totalBlocks) {
+ if (MultiTracker.isDriver && sentBlocks.get < totalBlocks) {
blockToSend = sentBlocks.getAndIncrement
}
@@ -1031,7 +1031,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private[spark] class BitTorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new BitTorrentBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 2ffe7f741d..415bde5d67 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -15,7 +15,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
}
private[spark]
-class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -33,7 +33,7 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isMaster)
+ broadcastFactory.initialize(isDriver)
initialized = true
}
@@ -49,5 +49,5 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl
def newBroadcast[T](value_ : T, isLocal: Boolean) =
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
- def isMaster = isMaster_
+ def isDriver = _isDriver
}
diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
index ab6d302827..5c6184c3c7 100644
--- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
@@ -7,7 +7,7 @@ package spark.broadcast
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
- def initialize(isMaster: Boolean): Unit
- def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T]
+ def initialize(isDriver: Boolean): Unit
+ def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 8e490e6bad..7e30b8f7d2 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -48,7 +48,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
@@ -69,12 +69,12 @@ private object HttpBroadcast extends Logging {
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
- def initialize(isMaster: Boolean) {
+ def initialize(isDriver: Boolean) {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
- if (isMaster) {
+ if (isDriver) {
createServer()
}
serverUri = System.getProperty("spark.httpBroadcast.uri")
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index 5e76dedb94..3fd77af73f 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -23,25 +23,24 @@ extends Logging {
var ranGen = new Random
private var initialized = false
- private var isMaster_ = false
+ private var _isDriver = false
private var stopBroadcast = false
private var trackMV: TrackMultipleValues = null
- def initialize(isMaster__ : Boolean) {
+ def initialize(__isDriver: Boolean) {
synchronized {
if (!initialized) {
+ _isDriver = __isDriver
- isMaster_ = isMaster__
-
- if (isMaster) {
+ if (isDriver) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
- // Set masterHostAddress to the master's IP address for the slaves to read
- System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress)
+ // Set DriverHostAddress to the driver's IP address for the slaves to read
+ System.setProperty("spark.MultiTracker.DriverHostAddress", Utils.localIpAddress)
}
initialized = true
@@ -54,10 +53,10 @@ extends Logging {
}
// Load common parameters
- private var MasterHostAddress_ = System.getProperty(
- "spark.MultiTracker.MasterHostAddress", "")
- private var MasterTrackerPort_ = System.getProperty(
- "spark.broadcast.masterTrackerPort", "11111").toInt
+ private var DriverHostAddress_ = System.getProperty(
+ "spark.MultiTracker.DriverHostAddress", "")
+ private var DriverTrackerPort_ = System.getProperty(
+ "spark.broadcast.driverTrackerPort", "11111").toInt
private var BlockSize_ = System.getProperty(
"spark.broadcast.blockSize", "4096").toInt * 1024
private var MaxRetryCount_ = System.getProperty(
@@ -91,11 +90,11 @@ extends Logging {
private var EndGameFraction_ = System.getProperty(
"spark.broadcast.endGameFraction", "0.95").toDouble
- def isMaster = isMaster_
+ def isDriver = _isDriver
// Common config params
- def MasterHostAddress = MasterHostAddress_
- def MasterTrackerPort = MasterTrackerPort_
+ def DriverHostAddress = DriverHostAddress_
+ def DriverTrackerPort = DriverTrackerPort_
def BlockSize = BlockSize_
def MaxRetryCount = MaxRetryCount_
@@ -123,7 +122,7 @@ extends Logging {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
- serverSocket = new ServerSocket(MasterTrackerPort)
+ serverSocket = new ServerSocket(DriverTrackerPort)
logInfo("TrackMultipleValues started at " + serverSocket)
try {
@@ -235,7 +234,7 @@ extends Logging {
try {
// Connect to the tracker to find out GuideInfo
clientSocketToTracker =
- new Socket(MultiTracker.MasterHostAddress, MultiTracker.MasterTrackerPort)
+ new Socket(MultiTracker.DriverHostAddress, MultiTracker.DriverTrackerPort)
oosTracker =
new ObjectOutputStream(clientSocketToTracker.getOutputStream)
oosTracker.flush()
@@ -276,7 +275,7 @@ extends Logging {
}
def registerBroadcast(id: Long, gInfo: SourceInfo) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
@@ -303,7 +302,7 @@ extends Logging {
}
def unregisterBroadcast(id: Long) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index f573512835..c55c476117 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -98,7 +98,7 @@ extends Broadcast[T](id) with Logging with Serializable {
case None =>
logInfo("Started reading broadcast variable " + id)
- // Initializing everything because Master will only send null/0 values
+ // Initializing everything because Driver will only send null/0 values
// Only the 1st worker in a node can be here. Others will get from cache
initializeWorkerVariables()
@@ -157,55 +157,55 @@ extends Broadcast[T](id) with Logging with Serializable {
listenPortLock.synchronized { listenPortLock.wait() }
}
- var clientSocketToMaster: Socket = null
- var oosMaster: ObjectOutputStream = null
- var oisMaster: ObjectInputStream = null
+ var clientSocketToDriver: Socket = null
+ var oosDriver: ObjectOutputStream = null
+ var oisDriver: ObjectInputStream = null
// Connect and receive broadcast from the specified source, retrying the
// specified number of times in case of failures
var retriesLeft = MultiTracker.MaxRetryCount
do {
- // Connect to Master and send this worker's Information
- clientSocketToMaster = new Socket(MultiTracker.MasterHostAddress, gInfo.listenPort)
- oosMaster = new ObjectOutputStream(clientSocketToMaster.getOutputStream)
- oosMaster.flush()
- oisMaster = new ObjectInputStream(clientSocketToMaster.getInputStream)
+ // Connect to Driver and send this worker's Information
+ clientSocketToDriver = new Socket(MultiTracker.DriverHostAddress, gInfo.listenPort)
+ oosDriver = new ObjectOutputStream(clientSocketToDriver.getOutputStream)
+ oosDriver.flush()
+ oisDriver = new ObjectInputStream(clientSocketToDriver.getInputStream)
- logDebug("Connected to Master's guiding object")
+ logDebug("Connected to Driver's guiding object")
// Send local source information
- oosMaster.writeObject(SourceInfo(hostAddress, listenPort))
- oosMaster.flush()
+ oosDriver.writeObject(SourceInfo(hostAddress, listenPort))
+ oosDriver.flush()
- // Receive source information from Master
- var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
+ // Receive source information from Driver
+ var sourceInfo = oisDriver.readObject.asInstanceOf[SourceInfo]
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized { totalBlocksLock.notifyAll() }
totalBytes = sourceInfo.totalBytes
- logDebug("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
+ logDebug("Received SourceInfo from Driver:" + sourceInfo + " My Port: " + listenPort)
val start = System.nanoTime
val receptionSucceeded = receiveSingleTransmission(sourceInfo)
val time = (System.nanoTime - start) / 1e9
- // Updating some statistics in sourceInfo. Master will be using them later
+ // Updating some statistics in sourceInfo. Driver will be using them later
if (!receptionSucceeded) {
sourceInfo.receptionFailed = true
}
- // Send back statistics to the Master
- oosMaster.writeObject(sourceInfo)
+ // Send back statistics to the Driver
+ oosDriver.writeObject(sourceInfo)
- if (oisMaster != null) {
- oisMaster.close()
+ if (oisDriver != null) {
+ oisDriver.close()
}
- if (oosMaster != null) {
- oosMaster.close()
+ if (oosDriver != null) {
+ oosDriver.close()
}
- if (clientSocketToMaster != null) {
- clientSocketToMaster.close()
+ if (clientSocketToDriver != null) {
+ clientSocketToDriver.close()
}
retriesLeft -= 1
@@ -552,7 +552,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
private def sendObject() {
- // Wait till receiving the SourceInfo from Master
+ // Wait till receiving the SourceInfo from Driver
while (totalBlocks == -1) {
totalBlocksLock.synchronized { totalBlocksLock.wait() }
}
@@ -576,7 +576,7 @@ extends Broadcast[T](id) with Logging with Serializable {
private[spark] class TreeBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new TreeBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8f51051e39..2836574ecb 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -16,7 +16,7 @@ import scala.collection.mutable.ArrayBuffer
* fault recovery without spinning up a lot of processes.
*/
private[spark]
-class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
@@ -25,29 +25,28 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
var masterPort : Int = _
var masterUrl : String = _
- val slaveActorSystems = ArrayBuffer[ActorSystem]()
- val slaveActors = ArrayBuffer[ActorRef]()
+ val workerActorSystems = ArrayBuffer[ActorSystem]()
+ val workerActors = ArrayBuffer[ActorRef]()
def start() : String = {
- logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+ logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterActorSystem = actorSystem
masterUrl = "spark://" + localIpAddress + ":" + masterPort
- val actor = masterActorSystem.actorOf(
+ masterActor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
- masterActor = actor
/* Start the Slaves */
- for (slaveNum <- 1 to numSlaves) {
+ for (workerNum <- 1 to numWorkers) {
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
- slaveActorSystems += actorSystem
+ AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
+ workerActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
name = "Worker")
- slaveActors += actor
+ workerActors += actor
}
return masterUrl
@@ -55,9 +54,9 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
def stop() {
logInfo("Shutting down local Spark cluster.")
- // Stop the slaves before the master so they don't get upset that it disconnected
- slaveActorSystems.foreach(_.shutdown())
- slaveActorSystems.foreach(_.awaitTermination())
+ // Stop the workers before the master so they don't get upset that it disconnected
+ workerActorSystems.foreach(_.shutdown())
+ workerActorSystems.foreach(_.awaitTermination())
masterActorSystem.shutdown()
masterActorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index da6abcc9c2..7035f4b394 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -12,7 +12,7 @@ private[spark] trait ClientListener {
def disconnected(): Unit
- def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
+ def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int): Unit
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
+ def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 130b031a2a..a274b21c34 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -10,7 +10,7 @@ private[spark] class JobInfo(
val id: String,
val desc: JobDescription,
val submitDate: Date,
- val actor: ActorRef)
+ val driver: ActorRef)
{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 2e7e868579..c618e87cdd 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -88,7 +88,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
execOption match {
case Some(exec) => {
exec.state = state
- exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
+ exec.job.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
@@ -100,11 +100,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
schedule()
} else {
- val e = new SparkException("Job %s with ID %s failed %d times.".format(
+ logError("Job %s with ID %s failed %d times, removing it".format(
jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
- logError(e.getMessage, e)
- throw e
- //System.exit(1)
+ removeJob(jobInfo)
}
}
}
@@ -199,7 +197,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome)
- exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
+ exec.job.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
@@ -221,19 +219,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
+ exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
exec.job.executors -= exec.id
}
}
- def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
+ def addJob(desc: JobDescription, driver: ActorRef): JobInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- val job = new JobInfo(now, newJobId(date), desc, date, actor)
+ val job = new JobInfo(now, newJobId(date), desc, date, driver)
jobs += job
idToJob(job.id) = job
- actorToJob(sender) = job
- addressToJob(sender.path.address) = job
+ actorToJob(driver) = job
+ addressToJob(driver.path.address) = job
return job
}
@@ -242,8 +240,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
logInfo("Removing job " + job.id)
jobs -= job
idToJob -= job.id
- actorToJob -= job.actor
- addressToWorker -= job.actor.path.address
+ actorToJob -= job.driver
+ addressToWorker -= job.driver.path.address
completedJobs += job // Remember it in our history
waitingJobs -= job
for (exec <- job.executors.values) {
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 19bf2be118..8b41620d98 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -134,7 +134,9 @@ private[spark] class Worker(
val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
- logInfo("Executor " + fullId + " finished with state " + state)
+ logInfo("Executor " + fullId + " finished with state " + state +
+ message.map(" message " + _).getOrElse("") +
+ exitStatus.map(" exitStatus " + _).getOrElse(""))
finishedExecutors(fullId) = executor
executors -= fullId
coresUsed -= executor.cores
@@ -143,9 +145,13 @@ private[spark] class Worker(
case KillExecutor(jobId, execId) =>
val fullId = jobId + "/" + execId
- val executor = executors(fullId)
- logInfo("Asked to kill executor " + fullId)
- executor.kill()
+ executors.get(fullId) match {
+ case Some(executor) =>
+ logInfo("Asked to kill executor " + fullId)
+ executor.kill()
+ case None =>
+ logInfo("Asked to kill unknown executor " + fullId)
+ }
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 1ef88075ad..818d6d1dda 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
+ executor.initialize(
+ executorInfo.getExecutorId.getValue,
+ slaveInfo.getHostname,
+ properties
+ )
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 50871802ea..e45288ff53 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -16,7 +16,7 @@ import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
executor: Executor,
- masterUrl: String,
+ driverUrl: String,
executorId: String,
hostname: String,
cores: Int)
@@ -24,25 +24,25 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
- var master: ActorRef = null
+ var driver: ActorRef = null
override def preStart() {
try {
- logInfo("Connecting to master: " + masterUrl)
- master = context.actorFor(masterUrl)
- master ! RegisterExecutor(executorId, hostname, cores)
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
- logError("Failed to connect to master", e)
+ logError("Failed to connect to driver", e)
System.exit(1)
}
}
override def receive = {
case RegisteredExecutor(sparkProperties) =>
- logInfo("Successfully registered with master")
+ logInfo("Successfully registered with driver")
executor.initialize(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
@@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- master ! StatusUpdate(executorId, taskId, state, data)
+ driver ! StatusUpdate(executorId, taskId, state, data)
}
}
private[spark] object StandaloneExecutorBackend {
- def run(masterUrl: String, executorId: String, hostname: String, cores: Int) {
+ def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, masterUrl, executorId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
if (args.length != 4) {
- System.err.println("Usage: StandaloneExecutorBackend <master> <executorId> <hostname> <cores>")
+ System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index c193bf7c8d..cd5b7d57f3 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -12,7 +12,14 @@ import java.net._
private[spark]
-abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
+abstract class Connection(val channel: SocketChannel, val selector: Selector,
+ val remoteConnectionManagerId: ConnectionManagerId) extends Logging {
+ def this(channel_ : SocketChannel, selector_ : Selector) = {
+ this(channel_, selector_,
+ ConnectionManagerId.fromSocketAddress(
+ channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
+ ))
+ }
channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true)
@@ -25,7 +32,6 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
val remoteAddress = getRemoteAddress()
- val remoteConnectionManagerId = ConnectionManagerId.fromSocketAddress(remoteAddress)
def key() = channel.keyFor(selector)
@@ -103,8 +109,9 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
}
-private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
-extends Connection(SocketChannel.open, selector_) {
+private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
+ remoteId_ : ConnectionManagerId)
+extends Connection(SocketChannel.open, selector_, remoteId_) {
class Outbox(fair: Int = 0) {
val messages = new Queue[Message]()
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 2ecd14f536..c7f226044d 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -299,7 +299,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId,
+ new SendingConnection(inetSocketAddress, selector, connectionManagerId))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 6dd3ae003d..9760d23072 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -33,10 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
- val masterUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ // The endpoint for executors to talk to us
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
- val args = Seq(masterUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
@@ -54,23 +55,23 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- def connected(jobId: String) {
+ override def connected(jobId: String) {
logInfo("Connected to Spark cluster with job ID " + jobId)
}
- def disconnected() {
+ override def disconnected() {
if (!stopping) {
logError("Disconnected from Spark cluster!")
scheduler.error("Disconnected from Spark cluster")
}
}
- def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+ override def executorAdded(executorId: String, workerId: String, host: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
- id, host, cores, Utils.memoryMegabytesToString(memory)))
+ executorId, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
+ override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index c68f15bdfa..da7dcf4b6b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -6,7 +6,7 @@ import spark.util.SerializableBuffer
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Master to slaves
+// Driver to executors
private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
@@ -17,7 +17,7 @@ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
private[spark]
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-// Executors to master
+// Executors to driver
private[spark]
case class RegisterExecutor(executorId: String, host: String, cores: Int)
extends StandaloneClusterMessage
@@ -34,6 +34,6 @@ object StatusUpdate {
}
}
-// Internal messages in master
+// Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopMaster extends StandaloneClusterMessage
+private[spark] case object StopDriver extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 69822f568c..082022be1c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -23,7 +23,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
- class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
+ class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
val executorActor = new HashMap[String, ActorRef]
val executorAddress = new HashMap[String, Address]
val executorHost = new HashMap[String, String]
@@ -64,7 +64,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case ReviveOffers =>
makeOffers()
- case StopMaster =>
+ case StopDriver =>
sender ! true
context.stop(self)
@@ -113,10 +113,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
- var masterActor: ActorRef = null
+ var driverActor: ActorRef = null
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
- def start() {
+ override def start() {
val properties = new ArrayBuffer[(String, String)]
val iterator = System.getProperties.entrySet.iterator
while (iterator.hasNext) {
@@ -126,15 +126,15 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
properties += ((key, value))
}
}
- masterActor = actorSystem.actorOf(
- Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
+ driverActor = actorSystem.actorOf(
+ Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
- def stop() {
+ override def stop() {
try {
- if (masterActor != null) {
+ if (driverActor != null) {
val timeout = 5.seconds
- val future = masterActor.ask(StopMaster)(timeout)
+ val future = driverActor.ask(StopDriver)(timeout)
Await.result(future, timeout)
}
} catch {
@@ -143,11 +143,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
- def reviveOffers() {
- masterActor ! ReviveOffers
+ override def reviveOffers() {
+ driverActor ! ReviveOffers
}
- def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+ override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
}
private[spark] object StandaloneSchedulerBackend {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 014906b028..7bf56a05d6 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -104,11 +104,11 @@ private[spark] class CoarseMesosSchedulerBackend(
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val runScript = new File(sparkHome, "run").getCanonicalPath
- val masterUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f3467db86b..eab1c60e0b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks
- var executorInfo: ExecutorInfo = null
+ var execArgs: Array[Byte] = null
override def start() {
synchronized {
@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
}
}.start()
- executorInfo = createExecutorInfo()
waitForRegister()
}
}
- def createExecutorInfo(): ExecutorInfo = {
+ def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(memory)
@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
* containing all the spark.* system properties in the form of (String, String) pairs.
*/
private def createExecArg(): Array[Byte] = {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
+ if (execArgs == null) {
+ val props = new HashMap[String, String]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ props(key) = value
+ }
}
+ // Serialize the map as an array of (String, String) pairs
+ execArgs = Utils.serialize(props.toArray)
}
- // Serialize the map as an array of (String, String) pairs
- return Utils.serialize(props.toArray)
+ return execArgs
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
return MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(executorInfo)
+ .setExecutor(createExecutorInfo(slaveId))
.setName(task.name)
.addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask))
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index c7ee76f0b7..99324445ca 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -15,52 +15,51 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-
private[spark] class BlockManagerMaster(
val actorSystem: ActorSystem,
- isMaster: Boolean,
+ isDriver: Boolean,
isLocal: Boolean,
- masterIp: String,
- masterPort: Int)
+ driverIp: String,
+ driverPort: Int)
extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
- val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+ val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
- var masterActor: ActorRef = {
- if (isMaster) {
- val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
- name = MASTER_AKKA_ACTOR_NAME)
+ var driverActor: ActorRef = {
+ if (isDriver) {
+ val driverActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = DRIVER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
- masterActor
+ driverActor
} else {
- val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+ val url = "akka://spark@%s:%s/user/%s".format(driverIp, driverPort, DRIVER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
actorSystem.actorFor(url)
}
}
- /** Remove a dead executor from the master actor. This is only called on the master side. */
+ /** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
/**
- * Send the master actor a heart beat from the slave. Returns true if everything works out,
- * false if the master does not know about the given block manager, which means the block
+ * Send the driver actor a heart beat from the slave. Returns true if everything works out,
+ * false if the driver does not know about the given block manager, which means the block
* manager should re-register.
*/
def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
- askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
+ askDriverWithReply[Boolean](HeartBeat(blockManagerId))
}
- /** Register the BlockManager's id with the master. */
+ /** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
@@ -74,25 +73,25 @@ private[spark] class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
- val res = askMasterWithRetry[Boolean](
+ val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logInfo("Updated info of block " + blockId)
res
}
- /** Get locations of the blockId from the master */
+ /** Get locations of the blockId from the driver */
def getLocations(blockId: String): Seq[BlockManagerId] = {
- askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
+ askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
}
- /** Get locations of multiple blockIds from the master */
+ /** Get locations of multiple blockIds from the driver */
def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
+ askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- /** Get ids of other nodes in the cluster from the master */
+ /** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
@@ -102,10 +101,10 @@ private[spark] class BlockManagerMaster(
/**
* Remove a block from the slaves that have it. This can only be used to remove
- * blocks that the master knows about.
+ * blocks that the driver knows about.
*/
def removeBlock(blockId: String) {
- askMasterWithRetry(RemoveBlock(blockId))
+ askDriverWithReply(RemoveBlock(blockId))
}
/**
@@ -115,37 +114,37 @@ private[spark] class BlockManagerMaster(
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+ askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
def getStorageStatus: Array[StorageStatus] = {
askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
}
- /** Stop the master actor, called only on the Spark master node */
+ /** Stop the driver actor, called only on the Spark driver node */
def stop() {
- if (masterActor != null) {
+ if (driverActor != null) {
tell(StopBlockManagerMaster)
- masterActor = null
+ driverActor = null
logInfo("BlockManagerMaster stopped")
}
}
/** Send a one-way message to the master actor, to which we expect it to reply with true. */
private def tell(message: Any) {
- if (!askMasterWithRetry[Boolean](message)) {
+ if (!askDriverWithReply[Boolean](message)) {
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
}
/**
- * Send a message to the master actor and get its result within a default timeout, or
+ * Send a message to the driver actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
- private def askMasterWithRetry[T](message: Any): T = {
+ private def askDriverWithReply[T](message: Any): T = {
// TODO: Consider removing multiple attempts
- if (masterActor == null) {
- throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+ if (driverActor == null) {
+ throw new SparkException("Error sending message to BlockManager as driverActor is null " +
"[message = " + message + "]")
}
var attempts = 0
@@ -153,7 +152,7 @@ private[spark] class BlockManagerMaster(
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = masterActor.ask(message)(timeout)
+ val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new Exception("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index f04c046c31..a70d1c8e78 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -75,9 +75,9 @@ private[spark] object ThreadingTest {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
- val masterIp: String = System.getProperty("spark.master.host", "localhost")
- val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
- val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
+ val driverIp: String = System.getProperty("spark.driver.host", "localhost")
+ val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, driverIp, driverPort)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index f50ba093e9..934e4c2f67 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -46,7 +46,7 @@ public class JavaAPISuite implements Serializable {
sc.stop();
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index b5e31ddae3..ff00dd05dd 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -26,7 +26,7 @@ object LocalSparkContext {
def stop(sc: SparkContext) {
sc.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index e8fe7ecabc..f4e7ec39fe 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -78,10 +78,10 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
try {
- System.clearProperty("spark.master.host") // In case some previous test had set it
+ System.clearProperty("spark.driver.host") // In case some previous test had set it
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
- System.setProperty("spark.master.port", boundPort.toString)
+ System.setProperty("spark.driver.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
@@ -106,7 +106,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
} finally {
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
}
}