aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-01-30 10:12:09 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-01-30 10:12:09 +0800
commitdd27f8eef71c02216a61def071899942ed28cbff (patch)
treeb76d20be8b9ac949a402d8ccba7d44905ec4fe90
parent4670c99a21debda40a9def5b69fc30da4938d99c (diff)
parentccb67ff2cae366973a1a2e7eac57db4e861a4ca7 (diff)
downloadspark-dd27f8eef71c02216a61def071899942ed28cbff.tar.gz
spark-dd27f8eef71c02216a61def071899942ed28cbff.tar.bz2
spark-dd27f8eef71c02216a61def071899942ed28cbff.zip
Merge branch 'mesos'
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala2
-rw-r--r--core/src/main/scala/spark/CacheManager.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala10
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala4
-rw-r--r--core/src/main/scala/spark/RDD.scala146
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala19
-rw-r--r--core/src/main/scala/spark/SparkContext.scala32
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala22
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala7
-rw-r--r--core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala24
-rw-r--r--core/src/main/scala/spark/broadcast/Broadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/BroadcastFactory.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala6
-rw-r--r--core/src/main/scala/spark/broadcast/MultiTracker.scala35
-rw-r--r--core/src/main/scala/spark/broadcast/TreeBroadcast.scala52
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala27
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala18
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala24
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala61
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala14
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala13
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala14
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala28
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala69
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala6
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala9
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala6
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala21
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala6
-rw-r--r--docs/configuration.md12
-rw-r--r--python/pyspark/tests.py2
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala4
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala2
56 files changed, 442 insertions, 408 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index ca59f46843..3c2f9c4616 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -23,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
test("halting by voting") {
diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala
index a0b53fd9d6..711435c333 100644
--- a/core/src/main/scala/spark/CacheManager.scala
+++ b/core/src/main/scala/spark/CacheManager.scala
@@ -10,9 +10,9 @@ import spark.storage.{BlockManager, StorageLevel}
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[String]
- /** Gets or computes an RDD split. Used by RDD.iterator() when a RDD is cached. */
+ /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel)
- : Iterator[T] = {
+ : Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
blockManager.get(key) match {
@@ -50,7 +50,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// If we got here, we have to load the split
val elements = new ArrayBuffer[Any]
logInfo("Computing partition " + split)
- elements ++= rdd.compute(split, context)
+ elements ++= rdd.computeOrReadCheckpoint(split, context)
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index c1f012b419..aaf433b324 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -38,10 +38,7 @@ private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Ac
}
}
-private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
- val ip: String = System.getProperty("spark.master.host", "localhost")
- val port: Int = System.getProperty("spark.master.port", "7077").toInt
- val actorName: String = "MapOutputTracker"
+private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolean) extends Logging {
val timeout = 10.seconds
@@ -56,11 +53,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var cacheGeneration = generation
val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
- var trackerActor: ActorRef = if (isMaster) {
+ val actorName: String = "MapOutputTracker"
+ var trackerActor: ActorRef = if (isDriver) {
val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 53b051f1c5..231e23a7de 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -649,9 +649,7 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
}
private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U)
- extends RDD[(K, U)](prev) {
-
+class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
override def getSplits = firstParent[(K, V)].splits
override val partitioner = firstParent[(K, V)].partitioner
override def compute(split: Split, context: TaskContext) =
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 0d3857f9dd..210404d540 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,27 +1,17 @@
package spark
-import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream}
import java.net.URL
import java.util.{Date, Random}
import java.util.{HashMap => JHashMap}
-import java.util.concurrent.atomic.AtomicLong
import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.mapred.HadoopWriter
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
@@ -30,7 +20,6 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
-import spark.rdd.BlockRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
@@ -73,11 +62,11 @@ import SparkContext._
* on RDD internals.
*/
abstract class RDD[T: ClassManifest](
- @transient var sc: SparkContext,
- var dependencies_ : List[Dependency[_]]
+ @transient private var sc: SparkContext,
+ @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
-
+ /** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
@@ -85,25 +74,27 @@ abstract class RDD[T: ClassManifest](
// Methods that should be implemented by subclasses of RDD
// =======================================================================
- /** Function for computing a given partition. */
+ /** Implemented by subclasses to compute a given partition. */
def compute(split: Split, context: TaskContext): Iterator[T]
- /** Set of partitions in this RDD. */
- protected def getSplits(): Array[Split]
+ /**
+ * Implemented by subclasses to return the set of partitions in this RDD. This method will only
+ * be called once, so it is safe to implement a time-consuming computation in it.
+ */
+ protected def getSplits: Array[Split]
- /** How this RDD depends on any parent RDDs. */
- protected def getDependencies(): List[Dependency[_]] = dependencies_
+ /**
+ * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
+ * be called once, so it is safe to implement a time-consuming computation in it.
+ */
+ protected def getDependencies: Seq[Dependency[_]] = deps
- /** A friendly name for this RDD */
- var name: String = null
-
/** Optionally overridden by subclasses to specify placement preferences. */
protected def getPreferredLocations(split: Split): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None
-
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
@@ -111,13 +102,16 @@ abstract class RDD[T: ClassManifest](
/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()
+ /** A friendly name for this RDD */
+ var name: String = null
+
/** Assign a name to this RDD */
def setName(_name: String) = {
name = _name
this
}
- /**
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -142,15 +136,24 @@ abstract class RDD[T: ClassManifest](
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
+ // Our dependencies and splits will be gotten by calling subclass's methods below, and will
+ // be overwritten when we're checkpointed
+ private var dependencies_ : Seq[Dependency[_]] = null
+ @transient private var splits_ : Array[Split] = null
+
+ /** An Option holding our checkpoint RDD, if we are checkpointed */
+ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
+
/**
- * Get the preferred location of a split, taking into account whether the
+ * Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
- final def preferredLocations(split: Split): Seq[String] = {
- if (isCheckpointed) {
- checkpointData.get.getPreferredLocations(split)
- } else {
- getPreferredLocations(split)
+ final def dependencies: Seq[Dependency[_]] = {
+ checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
+ if (dependencies_ == null) {
+ dependencies_ = getDependencies
+ }
+ dependencies_
}
}
@@ -159,22 +162,21 @@ abstract class RDD[T: ClassManifest](
* RDD is checkpointed or not.
*/
final def splits: Array[Split] = {
- if (isCheckpointed) {
- checkpointData.get.getSplits
- } else {
- getSplits
+ checkpointRDD.map(_.splits).getOrElse {
+ if (splits_ == null) {
+ splits_ = getSplits
+ }
+ splits_
}
}
/**
- * Get the list of dependencies of this RDD, taking into account whether the
+ * Get the preferred location of a split, taking into account whether the
* RDD is checkpointed or not.
*/
- final def dependencies: List[Dependency[_]] = {
- if (isCheckpointed) {
- dependencies_
- } else {
- getDependencies
+ final def preferredLocations(split: Split): Seq[String] = {
+ checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
+ getPreferredLocations(split)
}
}
@@ -184,11 +186,20 @@ abstract class RDD[T: ClassManifest](
* subclasses of RDD.
*/
final def iterator(split: Split, context: TaskContext): Iterator[T] = {
- if (isCheckpointed) {
- checkpointData.get.iterator(split, context)
- } else if (storageLevel != StorageLevel.NONE) {
+ if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
+ computeOrReadCheckpoint(split, context)
+ }
+ }
+
+ /**
+ * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
+ */
+ private[spark] def computeOrReadCheckpoint(split: Split, context: TaskContext): Iterator[T] = {
+ if (isCheckpointed) {
+ firstParent[T].iterator(split, context)
+ } else {
compute(split, context)
}
}
@@ -578,15 +589,15 @@ abstract class RDD[T: ClassManifest](
/**
* Return whether this RDD has been checkpointed or not
*/
- def isCheckpointed(): Boolean = {
- if (checkpointData.isDefined) checkpointData.get.isCheckpointed() else false
+ def isCheckpointed: Boolean = {
+ checkpointData.map(_.isCheckpointed).getOrElse(false)
}
/**
* Gets the name of the file to which this RDD was checkpointed
*/
- def getCheckpointFile(): Option[String] = {
- if (checkpointData.isDefined) checkpointData.get.getCheckpointFile() else None
+ def getCheckpointFile: Option[String] = {
+ checkpointData.flatMap(_.getCheckpointFile)
}
// =======================================================================
@@ -611,31 +622,52 @@ abstract class RDD[T: ClassManifest](
def context = sc
/**
- * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler
+ * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
* after a job using this RDD has completed (therefore the RDD has been materialized and
* potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
*/
- protected[spark] def doCheckpoint() {
- if (checkpointData.isDefined) checkpointData.get.doCheckpoint()
- dependencies.foreach(_.rdd.doCheckpoint())
+ private[spark] def doCheckpoint() {
+ if (checkpointData.isDefined) {
+ checkpointData.get.doCheckpoint()
+ } else {
+ dependencies.foreach(_.rdd.doCheckpoint())
+ }
}
/**
- * Changes the dependencies of this RDD from its original parents to the new RDD
- * (`newRDD`) created from the checkpoint file.
+ * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
+ * created from the checkpoint file, and forget its old dependencies and splits.
*/
- protected[spark] def changeDependencies(newRDD: RDD[_]) {
+ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
- dependencies_ = List(new OneToOneDependency(newRDD))
+ dependencies_ = null
+ splits_ = null
+ deps = null // Forget the constructor argument for dependencies too
}
/**
* Clears the dependencies of this RDD. This method must ensure that all references
* to the original parent RDDs is removed to enable the parent RDDs to be garbage
* collected. Subclasses of RDD may override this method for implementing their own cleaning
- * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
+ * logic. See [[spark.rdd.UnionRDD]] for an example.
*/
- protected[spark] def clearDependencies() {
+ protected def clearDependencies() {
dependencies_ = null
}
+
+ /** A description of this RDD and its recursive dependencies for debugging. */
+ def toDebugString(): String = {
+ def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
+ Seq(prefix + rdd + " (" + rdd.splits.size + " splits)") ++
+ rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
+ }
+ debugString(this).mkString("\n")
+ }
+
+ override def toString(): String = "%s%s[%d] at %s".format(
+ Option(name).map(_ + " ").getOrElse(""),
+ getClass.getSimpleName,
+ id,
+ origin)
+
}
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index 18df530b7d..a4a4ebaf53 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -20,7 +20,7 @@ private[spark] object CheckpointState extends Enumeration {
* of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
-extends Logging with Serializable {
+ extends Logging with Serializable {
import CheckpointState._
@@ -31,7 +31,7 @@ extends Logging with Serializable {
@transient var cpFile: Option[String] = None
// The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
- @transient var cpRDD: Option[RDD[T]] = None
+ var cpRDD: Option[RDD[T]] = None
// Mark the RDD for checkpointing
def markForCheckpoint() {
@@ -41,12 +41,12 @@ extends Logging with Serializable {
}
// Is the RDD already checkpointed
- def isCheckpointed(): Boolean = {
+ def isCheckpointed: Boolean = {
RDDCheckpointData.synchronized { cpState == Checkpointed }
}
// Get the file to which this RDD was checkpointed to as an Option
- def getCheckpointFile(): Option[String] = {
+ def getCheckpointFile: Option[String] = {
RDDCheckpointData.synchronized { cpFile }
}
@@ -71,7 +71,7 @@ extends Logging with Serializable {
RDDCheckpointData.synchronized {
cpFile = Some(path)
cpRDD = Some(newRDD)
- rdd.changeDependencies(newRDD)
+ rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and splits
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
@@ -79,7 +79,7 @@ extends Logging with Serializable {
}
// Get preferred location of a split after checkpointing
- def getPreferredLocations(split: Split) = {
+ def getPreferredLocations(split: Split): Seq[String] = {
RDDCheckpointData.synchronized {
cpRDD.get.preferredLocations(split)
}
@@ -91,9 +91,10 @@ extends Logging with Serializable {
}
}
- // Get iterator. This is called at the worker nodes.
- def iterator(split: Split, context: TaskContext): Iterator[T] = {
- rdd.firstParent[T].iterator(split, context)
+ def checkpointRDD: Option[RDD[T]] = {
+ RDDCheckpointData.synchronized {
+ cpRDD
+ }
}
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 77036c1275..b0d4b58240 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -69,12 +69,12 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()
- // Set Spark master host and port system properties
- if (System.getProperty("spark.master.host") == null) {
- System.setProperty("spark.master.host", Utils.localIpAddress)
+ // Set Spark driver host and port system properties
+ if (System.getProperty("spark.driver.host") == null) {
+ System.setProperty("spark.driver.host", Utils.localIpAddress)
}
- if (System.getProperty("spark.master.port") == null) {
- System.setProperty("spark.master.port", "0")
+ if (System.getProperty("spark.driver.port") == null) {
+ System.setProperty("spark.driver.port", "0")
}
private val isLocal = (master == "local" || master.startsWith("local["))
@@ -82,15 +82,15 @@ class SparkContext(
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
- System.getProperty("spark.master.host"),
- System.getProperty("spark.master.port").toInt,
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port").toInt,
true,
isLocal)
SparkEnv.set(env)
// Start the BlockManager UI
private[spark] val ui = new BlockManagerUI(
- env.actorSystem, env.blockManager.master.masterActor, this)
+ env.actorSystem, env.blockManager.master.driverActor, this)
ui.start()
// Used to store a URL for each static file/jar together with the file's local timestamp
@@ -410,14 +410,14 @@ class SparkContext(
/**
* Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `+=` method. Only the master can access the accumulator's `value`.
+ * to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
/**
* Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
- * Only the master can access the accumuable's `value`.
+ * Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
@@ -545,7 +545,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
- * whether the scheduler can run the computation on the master rather than shipping it out to the
+ * whether the scheduler can run the computation on the driver rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
@@ -673,6 +673,16 @@ object SparkContext {
def zero(initialValue: Int) = 0
}
+ implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
+ def addInPlace(t1: Long, t2: Long) = t1 + t2
+ def zero(initialValue: Long) = 0l
+ }
+
+ implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
+ def addInPlace(t1: Float, t2: Float) = t1 + t2
+ def zero(initialValue: Float) = 0f
+ }
+
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 0c094edcf3..d2193ae72b 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -62,15 +62,15 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
- isMaster: Boolean,
+ isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
- // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
- // figure out which port number Akka actually bound to and set spark.master.port to it.
- if (isMaster && port == 0) {
- System.setProperty("spark.master.port", boundPort.toString)
+ // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
+ // figure out which port number Akka actually bound to and set spark.driver.port to it.
+ if (isDriver && port == 0) {
+ System.setProperty("spark.driver.port", boundPort.toString)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -84,22 +84,22 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
- val masterIp: String = System.getProperty("spark.master.host", "localhost")
- val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val driverIp: String = System.getProperty("spark.driver.host", "localhost")
+ val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(
- actorSystem, isMaster, isLocal, masterIp, masterPort)
+ actorSystem, isDriver, isLocal, driverIp, driverPort)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isMaster)
+ val broadcastManager = new BroadcastManager(isDriver)
val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "spark.JavaSerializer")
val cacheManager = new CacheManager(blockManager)
- val mapOutputTracker = new MapOutputTracker(actorSystem, isMaster)
+ val mapOutputTracker = new MapOutputTracker(actorSystem, isDriver)
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
@@ -111,7 +111,7 @@ object SparkEnv extends Logging {
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
- val sparkFilesDir: String = if (isMaster) {
+ val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir().getAbsolutePath
} else {
"."
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 4c95c989b5..60025b459c 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -319,7 +319,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/**
* Return whether this RDD has been checkpointed or not
*/
- def isCheckpointed(): Boolean = rdd.isCheckpointed()
+ def isCheckpointed: Boolean = rdd.isCheckpointed
/**
* Gets the name of the file to which this RDD was checkpointed
@@ -330,4 +330,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
case _ => Optional.absent()
}
}
+
+ /** A description of this RDD and its recursive dependencies for debugging. */
+ def toDebugString(): String = {
+ rdd.toDebugString()
+ }
}
diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
index 386f505f2a..adcb2d2415 100644
--- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
@@ -31,7 +31,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
@transient var totalBlocks = -1
@transient var hasBlocks = new AtomicInteger(0)
- // Used ONLY by Master to track how many unique blocks have been sent out
+ // Used ONLY by driver to track how many unique blocks have been sent out
@transient var sentBlocks = new AtomicInteger(0)
@transient var listenPortLock = new Object
@@ -42,7 +42,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
@transient var serveMR: ServeMultipleRequests = null
- // Used only in Master
+ // Used only in driver
@transient var guideMR: GuideMultipleRequests = null
// Used only in Workers
@@ -99,14 +99,14 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
}
// Must always come AFTER listenPort is created
- val masterSource =
+ val driverSource =
SourceInfo(hostAddress, listenPort, totalBlocks, totalBytes)
hasBlocksBitVector.synchronized {
- masterSource.hasBlocksBitVector = hasBlocksBitVector
+ driverSource.hasBlocksBitVector = hasBlocksBitVector
}
// In the beginning, this is the only known source to Guide
- listOfSources += masterSource
+ listOfSources += driverSource
// Register with the Tracker
MultiTracker.registerBroadcast(id,
@@ -122,7 +122,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
case None =>
logInfo("Started reading broadcast variable " + id)
- // Initializing everything because Master will only send null/0 values
+ // Initializing everything because driver will only send null/0 values
// Only the 1st worker in a node can be here. Others will get from cache
initializeWorkerVariables()
@@ -151,7 +151,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
}
}
- // Initialize variables in the worker node. Master sends everything as 0/null
+ // Initialize variables in the worker node. Driver sends everything as 0/null
private def initializeWorkerVariables() {
arrayOfBlocks = null
hasBlocksBitVector = null
@@ -248,7 +248,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Receive source information from Guide
var suitableSources =
oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]]
- logDebug("Received suitableSources from Master " + suitableSources)
+ logDebug("Received suitableSources from Driver " + suitableSources)
addToListOfSources(suitableSources)
@@ -532,7 +532,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
oosSource.writeObject(blockToAskFor)
oosSource.flush()
- // CHANGED: Master might send some other block than the one
+ // CHANGED: Driver might send some other block than the one
// requested to ensure fast spreading of all blocks.
val recvStartTime = System.currentTimeMillis
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
@@ -982,9 +982,9 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
// Receive which block to send
var blockToSend = ois.readObject.asInstanceOf[Int]
- // If it is master AND at least one copy of each block has not been
+ // If it is driver AND at least one copy of each block has not been
// sent out already, MODIFY blockToSend
- if (MultiTracker.isMaster && sentBlocks.get < totalBlocks) {
+ if (MultiTracker.isDriver && sentBlocks.get < totalBlocks) {
blockToSend = sentBlocks.getAndIncrement
}
@@ -1031,7 +1031,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private[spark] class BitTorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new BitTorrentBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala
index 2ffe7f741d..415bde5d67 100644
--- a/core/src/main/scala/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/spark/broadcast/Broadcast.scala
@@ -15,7 +15,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
}
private[spark]
-class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -33,7 +33,7 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isMaster)
+ broadcastFactory.initialize(isDriver)
initialized = true
}
@@ -49,5 +49,5 @@ class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializabl
def newBroadcast[T](value_ : T, isLocal: Boolean) =
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
- def isMaster = isMaster_
+ def isDriver = _isDriver
}
diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
index ab6d302827..5c6184c3c7 100644
--- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala
@@ -7,7 +7,7 @@ package spark.broadcast
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
- def initialize(isMaster: Boolean): Unit
- def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T]
+ def initialize(isDriver: Boolean): Unit
+ def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
}
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 8e490e6bad..7e30b8f7d2 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -48,7 +48,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
@@ -69,12 +69,12 @@ private object HttpBroadcast extends Logging {
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
- def initialize(isMaster: Boolean) {
+ def initialize(isDriver: Boolean) {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
- if (isMaster) {
+ if (isDriver) {
createServer()
}
serverUri = System.getProperty("spark.httpBroadcast.uri")
diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala
index 5e76dedb94..3fd77af73f 100644
--- a/core/src/main/scala/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala
@@ -23,25 +23,24 @@ extends Logging {
var ranGen = new Random
private var initialized = false
- private var isMaster_ = false
+ private var _isDriver = false
private var stopBroadcast = false
private var trackMV: TrackMultipleValues = null
- def initialize(isMaster__ : Boolean) {
+ def initialize(__isDriver: Boolean) {
synchronized {
if (!initialized) {
+ _isDriver = __isDriver
- isMaster_ = isMaster__
-
- if (isMaster) {
+ if (isDriver) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
- // Set masterHostAddress to the master's IP address for the slaves to read
- System.setProperty("spark.MultiTracker.MasterHostAddress", Utils.localIpAddress)
+ // Set DriverHostAddress to the driver's IP address for the slaves to read
+ System.setProperty("spark.MultiTracker.DriverHostAddress", Utils.localIpAddress)
}
initialized = true
@@ -54,10 +53,10 @@ extends Logging {
}
// Load common parameters
- private var MasterHostAddress_ = System.getProperty(
- "spark.MultiTracker.MasterHostAddress", "")
- private var MasterTrackerPort_ = System.getProperty(
- "spark.broadcast.masterTrackerPort", "11111").toInt
+ private var DriverHostAddress_ = System.getProperty(
+ "spark.MultiTracker.DriverHostAddress", "")
+ private var DriverTrackerPort_ = System.getProperty(
+ "spark.broadcast.driverTrackerPort", "11111").toInt
private var BlockSize_ = System.getProperty(
"spark.broadcast.blockSize", "4096").toInt * 1024
private var MaxRetryCount_ = System.getProperty(
@@ -91,11 +90,11 @@ extends Logging {
private var EndGameFraction_ = System.getProperty(
"spark.broadcast.endGameFraction", "0.95").toDouble
- def isMaster = isMaster_
+ def isDriver = _isDriver
// Common config params
- def MasterHostAddress = MasterHostAddress_
- def MasterTrackerPort = MasterTrackerPort_
+ def DriverHostAddress = DriverHostAddress_
+ def DriverTrackerPort = DriverTrackerPort_
def BlockSize = BlockSize_
def MaxRetryCount = MaxRetryCount_
@@ -123,7 +122,7 @@ extends Logging {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
- serverSocket = new ServerSocket(MasterTrackerPort)
+ serverSocket = new ServerSocket(DriverTrackerPort)
logInfo("TrackMultipleValues started at " + serverSocket)
try {
@@ -235,7 +234,7 @@ extends Logging {
try {
// Connect to the tracker to find out GuideInfo
clientSocketToTracker =
- new Socket(MultiTracker.MasterHostAddress, MultiTracker.MasterTrackerPort)
+ new Socket(MultiTracker.DriverHostAddress, MultiTracker.DriverTrackerPort)
oosTracker =
new ObjectOutputStream(clientSocketToTracker.getOutputStream)
oosTracker.flush()
@@ -276,7 +275,7 @@ extends Logging {
}
def registerBroadcast(id: Long, gInfo: SourceInfo) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
@@ -303,7 +302,7 @@ extends Logging {
}
def unregisterBroadcast(id: Long) {
- val socket = new Socket(MultiTracker.MasterHostAddress, MasterTrackerPort)
+ val socket = new Socket(MultiTracker.DriverHostAddress, DriverTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
index f573512835..c55c476117 100644
--- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala
@@ -98,7 +98,7 @@ extends Broadcast[T](id) with Logging with Serializable {
case None =>
logInfo("Started reading broadcast variable " + id)
- // Initializing everything because Master will only send null/0 values
+ // Initializing everything because Driver will only send null/0 values
// Only the 1st worker in a node can be here. Others will get from cache
initializeWorkerVariables()
@@ -157,55 +157,55 @@ extends Broadcast[T](id) with Logging with Serializable {
listenPortLock.synchronized { listenPortLock.wait() }
}
- var clientSocketToMaster: Socket = null
- var oosMaster: ObjectOutputStream = null
- var oisMaster: ObjectInputStream = null
+ var clientSocketToDriver: Socket = null
+ var oosDriver: ObjectOutputStream = null
+ var oisDriver: ObjectInputStream = null
// Connect and receive broadcast from the specified source, retrying the
// specified number of times in case of failures
var retriesLeft = MultiTracker.MaxRetryCount
do {
- // Connect to Master and send this worker's Information
- clientSocketToMaster = new Socket(MultiTracker.MasterHostAddress, gInfo.listenPort)
- oosMaster = new ObjectOutputStream(clientSocketToMaster.getOutputStream)
- oosMaster.flush()
- oisMaster = new ObjectInputStream(clientSocketToMaster.getInputStream)
+ // Connect to Driver and send this worker's Information
+ clientSocketToDriver = new Socket(MultiTracker.DriverHostAddress, gInfo.listenPort)
+ oosDriver = new ObjectOutputStream(clientSocketToDriver.getOutputStream)
+ oosDriver.flush()
+ oisDriver = new ObjectInputStream(clientSocketToDriver.getInputStream)
- logDebug("Connected to Master's guiding object")
+ logDebug("Connected to Driver's guiding object")
// Send local source information
- oosMaster.writeObject(SourceInfo(hostAddress, listenPort))
- oosMaster.flush()
+ oosDriver.writeObject(SourceInfo(hostAddress, listenPort))
+ oosDriver.flush()
- // Receive source information from Master
- var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
+ // Receive source information from Driver
+ var sourceInfo = oisDriver.readObject.asInstanceOf[SourceInfo]
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized { totalBlocksLock.notifyAll() }
totalBytes = sourceInfo.totalBytes
- logDebug("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
+ logDebug("Received SourceInfo from Driver:" + sourceInfo + " My Port: " + listenPort)
val start = System.nanoTime
val receptionSucceeded = receiveSingleTransmission(sourceInfo)
val time = (System.nanoTime - start) / 1e9
- // Updating some statistics in sourceInfo. Master will be using them later
+ // Updating some statistics in sourceInfo. Driver will be using them later
if (!receptionSucceeded) {
sourceInfo.receptionFailed = true
}
- // Send back statistics to the Master
- oosMaster.writeObject(sourceInfo)
+ // Send back statistics to the Driver
+ oosDriver.writeObject(sourceInfo)
- if (oisMaster != null) {
- oisMaster.close()
+ if (oisDriver != null) {
+ oisDriver.close()
}
- if (oosMaster != null) {
- oosMaster.close()
+ if (oosDriver != null) {
+ oosDriver.close()
}
- if (clientSocketToMaster != null) {
- clientSocketToMaster.close()
+ if (clientSocketToDriver != null) {
+ clientSocketToDriver.close()
}
retriesLeft -= 1
@@ -552,7 +552,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
private def sendObject() {
- // Wait till receiving the SourceInfo from Master
+ // Wait till receiving the SourceInfo from Driver
while (totalBlocks == -1) {
totalBlocksLock.synchronized { totalBlocksLock.wait() }
}
@@ -576,7 +576,7 @@ extends Broadcast[T](id) with Logging with Serializable {
private[spark] class TreeBroadcastFactory
extends BroadcastFactory {
- def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }
+ def initialize(isDriver: Boolean) { MultiTracker.initialize(isDriver) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new TreeBroadcast[T](value_, isLocal, id)
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8f51051e39..2836574ecb 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -16,7 +16,7 @@ import scala.collection.mutable.ArrayBuffer
* fault recovery without spinning up a lot of processes.
*/
private[spark]
-class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
@@ -25,29 +25,28 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
var masterPort : Int = _
var masterUrl : String = _
- val slaveActorSystems = ArrayBuffer[ActorSystem]()
- val slaveActors = ArrayBuffer[ActorRef]()
+ val workerActorSystems = ArrayBuffer[ActorSystem]()
+ val workerActors = ArrayBuffer[ActorRef]()
def start() : String = {
- logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
+ logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterActorSystem = actorSystem
masterUrl = "spark://" + localIpAddress + ":" + masterPort
- val actor = masterActorSystem.actorOf(
+ masterActor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
- masterActor = actor
/* Start the Slaves */
- for (slaveNum <- 1 to numSlaves) {
+ for (workerNum <- 1 to numWorkers) {
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
- slaveActorSystems += actorSystem
+ AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
+ workerActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
name = "Worker")
- slaveActors += actor
+ workerActors += actor
}
return masterUrl
@@ -55,9 +54,9 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
def stop() {
logInfo("Shutting down local Spark cluster.")
- // Stop the slaves before the master so they don't get upset that it disconnected
- slaveActorSystems.foreach(_.shutdown())
- slaveActorSystems.foreach(_.awaitTermination())
+ // Stop the workers before the master so they don't get upset that it disconnected
+ workerActorSystems.foreach(_.shutdown())
+ workerActorSystems.foreach(_.awaitTermination())
masterActorSystem.shutdown()
masterActorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index da6abcc9c2..7035f4b394 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -12,7 +12,7 @@ private[spark] trait ClientListener {
def disconnected(): Unit
- def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
+ def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int): Unit
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
+ def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 130b031a2a..a274b21c34 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -10,7 +10,7 @@ private[spark] class JobInfo(
val id: String,
val desc: JobDescription,
val submitDate: Date,
- val actor: ActorRef)
+ val driver: ActorRef)
{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 2e7e868579..bc53b70015 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -88,7 +88,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
execOption match {
case Some(exec) => {
exec.state = state
- exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
+ exec.job.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
@@ -199,7 +199,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome)
- exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
+ exec.job.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
@@ -221,19 +221,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
+ exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
exec.job.executors -= exec.id
}
}
- def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
+ def addJob(desc: JobDescription, driver: ActorRef): JobInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- val job = new JobInfo(now, newJobId(date), desc, date, actor)
+ val job = new JobInfo(now, newJobId(date), desc, date, driver)
jobs += job
idToJob(job.id) = job
- actorToJob(sender) = job
- addressToJob(sender.path.address) = job
+ actorToJob(driver) = job
+ addressToJob(driver.path.address) = job
return job
}
@@ -242,8 +242,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
logInfo("Removing job " + job.id)
jobs -= job
idToJob -= job.id
- actorToJob -= job.actor
- addressToWorker -= job.actor.path.address
+ actorToJob -= job.driver
+ addressToWorker -= job.driver.path.address
completedJobs += job // Remember it in our history
waitingJobs -= job
for (exec <- job.executors.values) {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 50871802ea..e45288ff53 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -16,7 +16,7 @@ import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
executor: Executor,
- masterUrl: String,
+ driverUrl: String,
executorId: String,
hostname: String,
cores: Int)
@@ -24,25 +24,25 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
- var master: ActorRef = null
+ var driver: ActorRef = null
override def preStart() {
try {
- logInfo("Connecting to master: " + masterUrl)
- master = context.actorFor(masterUrl)
- master ! RegisterExecutor(executorId, hostname, cores)
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorFor(driverUrl)
+ driver ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ context.watch(driver) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
- logError("Failed to connect to master", e)
+ logError("Failed to connect to driver", e)
System.exit(1)
}
}
override def receive = {
case RegisteredExecutor(sparkProperties) =>
- logInfo("Successfully registered with master")
+ logInfo("Successfully registered with driver")
executor.initialize(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
@@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- master ! StatusUpdate(executorId, taskId, state, data)
+ driver ! StatusUpdate(executorId, taskId, state, data)
}
}
private[spark] object StandaloneExecutorBackend {
- def run(masterUrl: String, executorId: String, hostname: String, cores: Int) {
+ def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, masterUrl, executorId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
if (args.length != 4) {
- System.err.println("Usage: StandaloneExecutorBackend <master> <executorId> <hostname> <cores>")
+ System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 453d410ad4..0f9ca06531 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -1,7 +1,7 @@
package spark.rdd
import java.io.{ObjectOutputStream, IOException}
-import spark.{OneToOneDependency, NarrowDependency, RDD, SparkContext, Split, TaskContext}
+import spark._
private[spark]
@@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
val numSplitsInRdd2 = rdd2.splits.size
- @transient var splits_ = {
+ override def getSplits: Array[Split] = {
// create the cross product split
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
@@ -45,8 +45,6 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
array
}
- override def getSplits = splits_
-
override def getPreferredLocations(split: Split) = {
val currSplit = split.asInstanceOf[CartesianSplit]
rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
@@ -58,7 +56,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
- var deps_ = List(
+ override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numSplitsInRdd2)
},
@@ -67,11 +65,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
}
)
- override def getDependencies = deps_
-
override def clearDependencies() {
- deps_ = Nil
- splits_ = null
rdd1 = null
rdd2 = null
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 6f00f6ac73..96b593ba7c 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -9,23 +9,26 @@ import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
-private[spark] class CheckpointRDDSplit(idx: Int, val splitFile: String) extends Split {
- override val index: Int = idx
-}
+private[spark] class CheckpointRDDSplit(val index: Int) extends Split {}
/**
* This RDD represents a RDD checkpoint file (similar to HadoopRDD).
*/
private[spark]
-class CheckpointRDD[T: ClassManifest](sc: SparkContext, checkpointPath: String)
+class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
- @transient val path = new Path(checkpointPath)
- @transient val fs = path.getFileSystem(new Configuration())
+ @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
@transient val splits_ : Array[Split] = {
- val splitFiles = fs.listStatus(path).map(_.getPath.toString).filter(_.contains("part-")).sorted
- splitFiles.zipWithIndex.map(x => new CheckpointRDDSplit(x._2, x._1)).toArray
+ val dirContents = fs.listStatus(new Path(checkpointPath))
+ val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+ val numSplits = splitFiles.size
+ if (!splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
+ !splitFiles(numSplits-1).endsWith(CheckpointRDD.splitIdToFile(numSplits-1))) {
+ throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
+ }
+ Array.tabulate(numSplits)(i => new CheckpointRDDSplit(i))
}
checkpointData = Some(new RDDCheckpointData[T](this))
@@ -34,36 +37,34 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, checkpointPath: String)
override def getSplits = splits_
override def getPreferredLocations(split: Split): Seq[String] = {
- val status = fs.getFileStatus(path)
+ val status = fs.getFileStatus(new Path(checkpointPath))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
- locations.firstOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
+ locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
}
override def compute(split: Split, context: TaskContext): Iterator[T] = {
- CheckpointRDD.readFromFile(split.asInstanceOf[CheckpointRDDSplit].splitFile, context)
+ val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
+ CheckpointRDD.readFromFile(file, context)
}
override def checkpoint() {
- // Do nothing. Hadoop RDD should not be checkpointed.
+ // Do nothing. CheckpointRDD should not be checkpointed.
}
}
private[spark] object CheckpointRDD extends Logging {
- def splitIdToFileName(splitId: Int): String = {
- val numfmt = NumberFormat.getInstance()
- numfmt.setMinimumIntegerDigits(5)
- numfmt.setGroupingUsed(false)
- "part-" + numfmt.format(splitId)
+ def splitIdToFile(splitId: Int): String = {
+ "part-%05d".format(splitId)
}
- def writeToFile[T](path: String, blockSize: Int = -1)(context: TaskContext, iterator: Iterator[T]) {
+ def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(new Configuration())
- val finalOutputName = splitIdToFileName(context.splitId)
+ val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
- val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + context.attemptId)
+ val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
@@ -83,22 +84,22 @@ private[spark] object CheckpointRDD extends Logging {
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Checkpoint failed: failed to delete earlier output of task "
- + context.attemptId)
- }
- if (!fs.rename(tempOutputPath, finalOutputPath)) {
+ if (!fs.exists(finalOutputPath)) {
+ fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
- + context.attemptId)
+ + ctx.attemptId + " and final output path does not exist")
+ } else {
+ // Some other copy of this task must've finished before us and renamed it
+ logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
+ fs.delete(tempOutputPath, false)
}
}
}
- def readFromFile[T](path: String, context: TaskContext): Iterator[T] = {
- val inputPath = new Path(path)
- val fs = inputPath.getFileSystem(new Configuration())
+ def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+ val fs = path.getFileSystem(new Configuration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val fileInputStream = fs.open(inputPath, bufferSize)
+ val fileInputStream = fs.open(path, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 167755bbba..4c57434b65 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -27,11 +27,11 @@ private[spark] case class CoalescedRDDSplit(
* or to avoid having a large number of small tasks when processing a directory with many files.
*/
class CoalescedRDD[T: ClassManifest](
- var prev: RDD[T],
+ @transient var prev: RDD[T],
maxPartitions: Int)
- extends RDD[T](prev.context, Nil) { // Nil, so the dependencies_ var does not refer to parent RDDs
+ extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
- @transient var splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val prevSplits = prev.splits
if (prevSplits.length < maxPartitions) {
prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) }
@@ -44,26 +44,20 @@ class CoalescedRDD[T: ClassManifest](
}
}
- override def getSplits = splits_
-
override def compute(split: Split, context: TaskContext): Iterator[T] = {
split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap { parentSplit =>
firstParent[T].iterator(parentSplit, context)
}
}
- var deps_ : List[Dependency[_]] = List(
+ override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
}
)
- override def getDependencies() = deps_
-
override def clearDependencies() {
- deps_ = Nil
- splits_ = null
prev = null
}
}
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
index c6ceb272cd..5466c9c657 100644
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -3,13 +3,11 @@ package spark.rdd
import spark.{RDD, Split, TaskContext}
private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => U)
+class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getSplits = firstParent[T].splits
override def compute(split: Split, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index 97dd37950e..b8482338c6 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -7,23 +7,18 @@ import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext}
* all partitions. An example use case: If we know the RDD is partitioned by range,
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
+ *
+ * TODO: This currently doesn't give partition IDs properly!
*/
class PartitionPruningRDD[T: ClassManifest](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
- @transient
- var partitions_ : Array[Split] = dependencies_.head.asInstanceOf[PruneDependency[T]].partitions
-
override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context)
- override protected def getSplits = partitions_
+ override protected def getSplits =
+ getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
override val partitioner = firstParent[T].partitioner
-
- override def clearDependencies() {
- super.clearDependencies()
- partitions_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 28ff19876d..d396478673 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -22,16 +22,10 @@ class ShuffledRDD[K, V](
override val partitioner = Some(part)
- @transient var splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
-
- override def getSplits = splits_
+ override def getSplits = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index)
}
-
- override def clearDependencies() {
- splits_ = null
- }
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 82f0a44ecd..26a2d511f2 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -26,9 +26,9 @@ private[spark] class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], splitIn
class UnionRDD[T: ClassManifest](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
- extends RDD[T](sc, Nil) { // Nil, so the dependencies_ var does not refer to parent RDDs
+ extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
- @transient var splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0
for (rdd <- rdds; split <- rdd.splits) {
@@ -38,20 +38,16 @@ class UnionRDD[T: ClassManifest](
array
}
- override def getSplits = splits_
-
- @transient var deps_ = {
+ override def getDependencies: Seq[Dependency[_]] = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.splits.size)
pos += rdd.splits.size
}
- deps.toList
+ deps
}
- override def getDependencies = deps_
-
override def compute(s: Split, context: TaskContext): Iterator[T] =
s.asInstanceOf[UnionSplit[T]].iterator(context)
@@ -59,8 +55,6 @@ class UnionRDD[T: ClassManifest](
s.asInstanceOf[UnionSplit[T]].preferredLocations()
override def clearDependencies() {
- deps_ = null
- splits_ = null
rdds = null
}
}
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index d950b06c85..e5df6d8c72 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -32,9 +32,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2)))
with Serializable {
- // TODO: FIX THIS.
-
- @transient var splits_ : Array[Split] = {
+ override def getSplits: Array[Split] = {
if (rdd1.splits.size != rdd2.splits.size) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
@@ -45,8 +43,6 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
array
}
- override def getSplits = splits_
-
override def compute(s: Split, context: TaskContext): Iterator[(T, U)] = {
val (split1, split2) = s.asInstanceOf[ZippedSplit[T, U]].splits
rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
@@ -58,7 +54,6 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def clearDependencies() {
- splits_ = null
rdd1 = null
rdd2 = null
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bd541d4207..b130be6a38 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -308,10 +308,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
} else {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
- logDebug("Checking for newly runnable parent stages")
- logDebug("running: " + running)
- logDebug("waiting: " + waiting)
- logDebug("failed: " + failed)
+ logTrace("Checking for newly runnable parent stages")
+ logTrace("running: " + running)
+ logTrace("waiting: " + waiting)
+ logTrace("failed: " + failed)
val waiting2 = waiting.toArray
waiting.clear()
for (stage <- waiting2.sortBy(_.priority)) {
@@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
+ if (!stage.submissionTime.isDefined) {
+ stage.submissionTime = Some(System.currentTimeMillis())
+ }
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
+
+ def markStageAsFinished(stage: Stage) = {
+ val serviceTime = stage.submissionTime match {
+ case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
+ case _ => "Unkown"
+ }
+ logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
+ running -= stage
+ }
event.reason match {
case Success =>
logInfo("Completed " + task)
@@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
- job.listener.taskSucceeded(rt.outputId, event.result)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
activeJobs -= job
resultStageToJob -= stage
- running -= stage
+ markStageAsFinished(stage)
}
+ job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
- logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
- running -= stage
+ markStageAsFinished(stage)
+ logInfo("looking for newly runnable stages")
logInfo("running: " + running)
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index e9419728e3..374114d870 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -32,6 +32,9 @@ private[spark] class Stage(
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
+ /** When first task was submitted to scheduler. */
+ var submissionTime: Option[Long] = None
+
private var nextAttemptId = 0
def isAvailable: Boolean = {
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 6dd3ae003d..9760d23072 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -33,10 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
- val masterUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ // The endpoint for executors to talk to us
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
- val args = Seq(masterUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
@@ -54,23 +55,23 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- def connected(jobId: String) {
+ override def connected(jobId: String) {
logInfo("Connected to Spark cluster with job ID " + jobId)
}
- def disconnected() {
+ override def disconnected() {
if (!stopping) {
logError("Disconnected from Spark cluster!")
scheduler.error("Disconnected from Spark cluster")
}
}
- def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+ override def executorAdded(executorId: String, workerId: String, host: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
- id, host, cores, Utils.memoryMegabytesToString(memory)))
+ executorId, host, cores, Utils.memoryMegabytesToString(memory)))
}
- def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
+ override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index c68f15bdfa..da7dcf4b6b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -6,7 +6,7 @@ import spark.util.SerializableBuffer
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Master to slaves
+// Driver to executors
private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
@@ -17,7 +17,7 @@ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
private[spark]
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-// Executors to master
+// Executors to driver
private[spark]
case class RegisterExecutor(executorId: String, host: String, cores: Int)
extends StandaloneClusterMessage
@@ -34,6 +34,6 @@ object StatusUpdate {
}
}
-// Internal messages in master
+// Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopMaster extends StandaloneClusterMessage
+private[spark] case object StopDriver extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 69822f568c..082022be1c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -23,7 +23,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
- class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
+ class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
val executorActor = new HashMap[String, ActorRef]
val executorAddress = new HashMap[String, Address]
val executorHost = new HashMap[String, String]
@@ -64,7 +64,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case ReviveOffers =>
makeOffers()
- case StopMaster =>
+ case StopDriver =>
sender ! true
context.stop(self)
@@ -113,10 +113,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
- var masterActor: ActorRef = null
+ var driverActor: ActorRef = null
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
- def start() {
+ override def start() {
val properties = new ArrayBuffer[(String, String)]
val iterator = System.getProperties.entrySet.iterator
while (iterator.hasNext) {
@@ -126,15 +126,15 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
properties += ((key, value))
}
}
- masterActor = actorSystem.actorOf(
- Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
+ driverActor = actorSystem.actorOf(
+ Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
- def stop() {
+ override def stop() {
try {
- if (masterActor != null) {
+ if (driverActor != null) {
val timeout = 5.seconds
- val future = masterActor.ask(StopMaster)(timeout)
+ val future = driverActor.ask(StopDriver)(timeout)
Await.result(future, timeout)
}
} catch {
@@ -143,11 +143,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
- def reviveOffers() {
- masterActor ! ReviveOffers
+ override def reviveOffers() {
+ driverActor ! ReviveOffers
}
- def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+ override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
}
private[spark] object StandaloneSchedulerBackend {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 014906b028..7bf56a05d6 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -104,11 +104,11 @@ private[spark] class CoarseMesosSchedulerBackend(
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val runScript = new File(sparkHome, "run").getCanonicalPath
- val masterUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 1215d5f5c8..c61fd75c2b 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -243,7 +243,7 @@ class BlockManager(
val startTimeMs = System.currentTimeMillis
var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
- logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -253,7 +253,7 @@ class BlockManager(
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
- logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -645,7 +645,7 @@ class BlockManager(
var size = 0L
myInfo.synchronized {
- logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
if (level.useMemory) {
@@ -677,8 +677,10 @@ class BlockManager(
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
+
// Replicate block if required
if (level.replication > 1) {
+ val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
@@ -688,12 +690,10 @@ class BlockManager(
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
+ logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
}
-
BlockManager.dispose(bytesAfterPut)
- logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs))
-
return size
}
@@ -978,7 +978,7 @@ object BlockManager extends Logging {
*/
def dispose(buffer: ByteBuffer) {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
- logDebug("Unmapping " + buffer)
+ logTrace("Unmapping " + buffer)
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 55ff1dde9c..36398095a2 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -15,52 +15,51 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-
private[spark] class BlockManagerMaster(
val actorSystem: ActorSystem,
- isMaster: Boolean,
+ isDriver: Boolean,
isLocal: Boolean,
- masterIp: String,
- masterPort: Int)
+ driverIp: String,
+ driverPort: Int)
extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
- val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+ val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
- var masterActor: ActorRef = {
- if (isMaster) {
- val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
- name = MASTER_AKKA_ACTOR_NAME)
+ var driverActor: ActorRef = {
+ if (isDriver) {
+ val driverActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = DRIVER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
- masterActor
+ driverActor
} else {
- val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+ val url = "akka://spark@%s:%s/user/%s".format(driverIp, driverPort, DRIVER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
actorSystem.actorFor(url)
}
}
- /** Remove a dead executor from the master actor. This is only called on the master side. */
+ /** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
/**
- * Send the master actor a heart beat from the slave. Returns true if everything works out,
- * false if the master does not know about the given block manager, which means the block
+ * Send the driver actor a heart beat from the slave. Returns true if everything works out,
+ * false if the driver does not know about the given block manager, which means the block
* manager should re-register.
*/
def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
- askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
+ askDriverWithReply[Boolean](HeartBeat(blockManagerId))
}
- /** Register the BlockManager's id with the master. */
+ /** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
@@ -74,25 +73,25 @@ private[spark] class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
- val res = askMasterWithRetry[Boolean](
+ val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logInfo("Updated info of block " + blockId)
res
}
- /** Get locations of the blockId from the master */
+ /** Get locations of the blockId from the driver */
def getLocations(blockId: String): Seq[BlockManagerId] = {
- askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
+ askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
}
- /** Get locations of multiple blockIds from the master */
+ /** Get locations of multiple blockIds from the driver */
def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
- askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
+ askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- /** Get ids of other nodes in the cluster from the master */
+ /** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
- val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
@@ -102,10 +101,10 @@ private[spark] class BlockManagerMaster(
/**
* Remove a block from the slaves that have it. This can only be used to remove
- * blocks that the master knows about.
+ * blocks that the driver knows about.
*/
def removeBlock(blockId: String) {
- askMasterWithRetry(RemoveBlock(blockId))
+ askDriverWithReply(RemoveBlock(blockId))
}
/**
@@ -115,33 +114,33 @@ private[spark] class BlockManagerMaster(
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+ askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
- /** Stop the master actor, called only on the Spark master node */
+ /** Stop the driver actor, called only on the Spark driver node */
def stop() {
- if (masterActor != null) {
+ if (driverActor != null) {
tell(StopBlockManagerMaster)
- masterActor = null
+ driverActor = null
logInfo("BlockManagerMaster stopped")
}
}
/** Send a one-way message to the master actor, to which we expect it to reply with true. */
private def tell(message: Any) {
- if (!askMasterWithRetry[Boolean](message)) {
+ if (!askDriverWithReply[Boolean](message)) {
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
}
/**
- * Send a message to the master actor and get its result within a default timeout, or
+ * Send a message to the driver actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
- private def askMasterWithRetry[T](message: Any): T = {
+ private def askDriverWithReply[T](message: Any): T = {
// TODO: Consider removing multiple attempts
- if (masterActor == null) {
- throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+ if (driverActor == null) {
+ throw new SparkException("Error sending message to BlockManager as driverActor is null " +
"[message = " + message + "]")
}
var attempts = 0
@@ -149,7 +148,7 @@ private[spark] class BlockManagerMaster(
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = masterActor.ask(message)(timeout)
+ val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new Exception("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f88517f1a3..2830bc6297 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -115,7 +115,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def expireDeadHosts() {
- logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new HashSet[BlockManagerId]
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index f04c046c31..a70d1c8e78 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -75,9 +75,9 @@ private[spark] object ThreadingTest {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
- val masterIp: String = System.getProperty("spark.master.host", "localhost")
- val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
- val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
+ val driverIp: String = System.getProperty("spark.driver.host", "localhost")
+ val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, driverIp, driverPort)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 51fb440108..eaff7ae581 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -9,7 +9,6 @@ import spark.Logging
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
-
val delaySeconds = MetadataCleaner.getDelaySeconds
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
@@ -27,8 +26,8 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
if (delaySeconds > 0) {
logDebug(
- "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
- + "period of " + periodSeconds + " secs")
+ "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
+ "and period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
@@ -39,7 +38,7 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
object MetadataCleaner {
- def getDelaySeconds = (System.getProperty("spark.cleaner.delay", "-100").toDouble * 60).toInt
- def setDelaySeconds(delay: Long) { System.setProperty("spark.cleaner.delay", delay.toString) }
+ def getDelaySeconds = System.getProperty("spark.cleaner.delay", "-1").toInt
+ def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.delay", delay.toString) }
}
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index 78d64a44ae..ac8ae7d308 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -17,6 +17,12 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte
val d = sc.parallelize(1 to 20)
d.foreach{x => acc += x}
acc.value should be (210)
+
+
+ val longAcc = sc.accumulator(0l)
+ val maxInt = Integer.MAX_VALUE.toLong
+ d.foreach{x => longAcc += maxInt + x}
+ longAcc.value should be (210l + maxInt * 20)
}
test ("value not assignable from tasks") {
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 33c317720c..0b74607fb8 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
// Note that this test is very specific to the current implementation of CartesianRDD.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- ones.checkpoint // checkpoint that MappedRDD
+ ones.checkpoint() // checkpoint that MappedRDD
val cartesian = new CartesianRDD(sc, ones, ones)
val splitBeforeCheckpoint =
serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit])
@@ -125,7 +125,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
// Note that this test is very specific to the current implementation of CoalescedRDDSplits
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- ones.checkpoint // checkpoint that MappedRDD
+ ones.checkpoint() // checkpoint that MappedRDD
val coalesced = new CoalescedRDD(ones, 2)
val splitBeforeCheckpoint =
serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit])
@@ -160,7 +160,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// so only the RDD will reduce in serialized size, not the splits.
testParentCheckpointing(
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
}
/**
@@ -176,7 +175,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
testRDDSplitSize: Boolean = false
) {
// Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD
+ val baseRDD = generateLongLineageRDD()
val operatedRDD = op(baseRDD)
val parentRDD = operatedRDD.dependencies.headOption.orNull
val rddType = operatedRDD.getClass.getSimpleName
@@ -245,12 +244,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
testRDDSplitSize: Boolean
) {
// Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD
+ val baseRDD = generateLongLineageRDD()
val operatedRDD = op(baseRDD)
val parentRDD = operatedRDD.dependencies.head.rdd
val rddType = operatedRDD.getClass.getSimpleName
val parentRDDType = parentRDD.getClass.getSimpleName
+ // Get the splits and dependencies of the parent in case they're lazily computed
+ parentRDD.dependencies
+ parentRDD.splits
+
// Find serialized sizes before and after the checkpoint
val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one
@@ -267,7 +270,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
if (testRDDSize) {
assert(
rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
- "Size of " + rddType + " did not reduce after parent checkpointing parent " + parentRDDType +
+ "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
"[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
}
@@ -318,10 +321,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
/**
- * Get serialized sizes of the RDD and its splits
+ * Get serialized sizes of the RDD and its splits, in order to test whether the size shrinks
+ * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
*/
def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
- (Utils.serialize(rdd).size, Utils.serialize(rdd.splits).size)
+ (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
+ Utils.serialize(rdd.splits).length)
}
/**
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index f50ba093e9..934e4c2f67 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -46,7 +46,7 @@ public class JavaAPISuite implements Serializable {
sc.stop();
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index b5e31ddae3..ff00dd05dd 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -26,7 +26,7 @@ object LocalSparkContext {
def stop(sc: SparkContext) {
sc.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index e8fe7ecabc..f4e7ec39fe 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -78,10 +78,10 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
try {
- System.clearProperty("spark.master.host") // In case some previous test had set it
+ System.clearProperty("spark.driver.host") // In case some previous test had set it
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
- System.setProperty("spark.master.port", boundPort.toString)
+ System.setProperty("spark.driver.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
@@ -106,7 +106,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
} finally {
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
}
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 036a0df480..a7054b4321 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -202,7 +202,7 @@ Apart from these, the following properties are also available, and may be useful
<td>10</td>
<td>
Maximum message size to allow in "control plane" communication (for serialized tasks and task
- results), in MB. Increase this if your tasks need to send back large results to the master
+ results), in MB. Increase this if your tasks need to send back large results to the driver
(e.g. using <code>collect()</code> on a large dataset).
</td>
</tr>
@@ -211,7 +211,7 @@ Apart from these, the following properties are also available, and may be useful
<td>4</td>
<td>
Number of actor threads to use for communication. Can be useful to increase on large clusters
- when the master has a lot of CPU cores.
+ when the driver has a lot of CPU cores.
</td>
</tr>
<tr>
@@ -222,17 +222,17 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td>spark.master.host</td>
+ <td>spark.driver.host</td>
<td>(local hostname)</td>
<td>
- Hostname or IP address for the master to listen on.
+ Hostname or IP address for the driver to listen on.
</td>
</tr>
<tr>
- <td>spark.master.port</td>
+ <td>spark.driver.port</td>
<td>(random)</td>
<td>
- Port for the master to listen on.
+ Port for the driver to listen on.
</td>
</tr>
<tr>
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 46ab34f063..df7235756d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -26,7 +26,7 @@ class PySparkTestCase(unittest.TestCase):
sys.path = self._old_sys_path
# To avoid Akka rebinding to the same port, since it doesn't unbind
# immediately on shutdown
- self.sc.jvm.System.clearProperty("spark.master.port")
+ self.sc.jvm.System.clearProperty("spark.driver.port")
class TestCheckpoint(PySparkTestCase):
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index db78d06d4f..43559b96d3 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -31,7 +31,7 @@ class ReplSuite extends FunSuite {
if (interp.sparkContext != null)
interp.sparkContext.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
return out.toString
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index b11ef443dc..352f83fe0c 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -198,10 +198,10 @@ abstract class DStream[T: ClassManifest] (
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
- "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
- "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
- "the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
+ "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+ "set the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)
dependencies.foreach(_.validate())
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..37ba524b48 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -389,7 +389,7 @@ object StreamingContext {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
- MetadataCleaner.setDelaySeconds(60)
+ MetadataCleaner.setDelaySeconds(3600)
}
new SparkContext(master, frameworkName)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index aa6be95f30..8c322dd698 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -153,8 +153,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.master.host", "localhost")
- val port = System.getProperty("spark.master.port", "7077").toInt
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index c84e7331c7..79d6093429 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -43,7 +43,7 @@ public class JavaAPISuite implements Serializable {
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
@Test
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index bfdf32c73e..4a036f0710 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -10,7 +10,7 @@ class BasicOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
test("map") {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index d2f32c189b..563a7d1458 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -19,7 +19,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
var ssc: StreamingContext = null
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 7493ac1207..c4cfffbfc1 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -24,7 +24,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
override def framework = "CheckpointSuite"
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index d7ba7a5d17..70ae6e3934 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
test("network input stream") {
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 0c6e928835..cd9608df53 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -13,7 +13,7 @@ class WindowOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
val largerSlideInput = Seq(