aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-01-28 23:20:26 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-01-28 23:30:24 -0600
commit13368818af52cf7265156b4eb6bc635e1448b09b (patch)
treee6738ae4e22f63d7e4f7ef5d8fbe64c28a16b132
parent7dfb82a992d47491174d7929e31351d26cadfcda (diff)
parentf03d9760fd8ac67fd0865cb355ba75d2eff507fe (diff)
downloadspark-13368818af52cf7265156b4eb6bc635e1448b09b.tar.gz
spark-13368818af52cf7265156b4eb6bc635e1448b09b.tar.bz2
spark-13368818af52cf7265156b4eb6bc635e1448b09b.zip
Merge branch 'master' into driver
Conflicts: core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/SparkEnv.scala core/src/main/scala/spark/deploy/LocalSparkCluster.scala core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala core/src/main/scala/spark/storage/BlockManagerMaster.scala core/src/main/scala/spark/storage/ThreadingTest.scala core/src/test/scala/spark/MapOutputTrackerSuite.scala
-rwxr-xr-xbin/start-master.sh3
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/spark/RDD.scala13
-rw-r--r--core/src/main/scala/spark/SparkContext.scala20
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/spark/Utils.scala17
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala7
-rw-r--r--core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java20
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala21
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala3
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala57
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/MapStatus.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala46
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala110
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala78
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala38
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala45
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala27
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala75
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala85
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala8
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala78
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala3
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala11
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala13
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala4
-rw-r--r--core/src/main/twirl/spark/common/layout.scala.html (renamed from core/src/main/twirl/spark/deploy/common/layout.scala.html)0
-rw-r--r--core/src/main/twirl/spark/deploy/master/index.scala.html2
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_details.scala.html2
-rw-r--r--core/src/main/twirl/spark/deploy/worker/index.scala.html3
-rw-r--r--core/src/main/twirl/spark/storage/index.scala.html40
-rw-r--r--core/src/main/twirl/spark/storage/rdd.scala.html77
-rw-r--r--core/src/main/twirl/spark/storage/rdd_table.scala.html30
-rw-r--r--core/src/main/twirl/spark/storage/worker_table.scala.html24
-rw-r--r--core/src/test/scala/spark/DriverSuite.scala5
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java28
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala69
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala104
-rwxr-xr-xsbt/sbt2
57 files changed, 938 insertions, 390 deletions
diff --git a/bin/start-master.sh b/bin/start-master.sh
index a901b1c260..87feb261fe 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -26,7 +26,8 @@ fi
# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
- if [[ `hostname` == *ec2.internal ]]; then
+ # NOTE: ec2-metadata is installed on Amazon Linux AMI. Check based on that and hostname
+ if command -v ec2-metadata > /dev/null || [[ `hostname` == *ec2.internal ]]; then
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index d4f5164f7d..aaf433b324 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -114,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolea
var array = mapStatuses(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId) != null && array(mapId).address == bmAddress) {
+ if (array(mapId) != null && array(mapId).location == bmAddress) {
array(mapId) = null
}
}
@@ -277,7 +277,7 @@ private[spark] object MapOutputTracker {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
- (status.address, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, decompressSize(status.compressedSizes(reduceId)))
}
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index c79f34342f..0d3857f9dd 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,6 +94,9 @@ abstract class RDD[T: ClassManifest](
/** How this RDD depends on any parent RDDs. */
protected def getDependencies(): List[Dependency[_]] = dependencies_
+ /** A friendly name for this RDD */
+ var name: String = null
+
/** Optionally overridden by subclasses to specify placement preferences. */
protected def getPreferredLocations(split: Split): Seq[String] = Nil
@@ -108,7 +111,13 @@ abstract class RDD[T: ClassManifest](
/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()
- /**
+ /** Assign a name to this RDD */
+ def setName(_name: String) = {
+ name = _name
+ this
+ }
+
+ /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@@ -119,6 +128,8 @@ abstract class RDD[T: ClassManifest](
"Cannot change storage level of an RDD after it was already assigned a level")
}
storageLevel = newLevel
+ // Register the RDD with the SparkContext
+ sc.persistentRdds(id) = this
this
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d4991cb1e0..1ac262522c 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,6 +1,7 @@
package spark
import java.io._
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.net.{URI, URLClassLoader}
import java.lang.ref.WeakReference
@@ -43,6 +44,8 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import storage.BlockManagerUI
+import util.{MetadataCleaner, TimeStampedHashMap}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -78,16 +81,27 @@ class SparkContext(
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
+ "<driver>",
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port").toInt,
true,
isLocal)
SparkEnv.set(env)
+ // Start the BlockManager UI
+ private[spark] val ui = new BlockManagerUI(
+ env.actorSystem, env.blockManager.master.driverActor, this)
+ ui.start()
+
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
+ // Keeps track of all persisted RDDs
+ private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
+ private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+
+
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
@@ -493,6 +507,7 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
if (dagScheduler != null) {
+ metadataCleaner.cancel()
dagScheduler.stop()
dagScheduler = null
taskScheduler = null
@@ -635,6 +650,11 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
+
+ /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
+ private[spark] def cleanup(cleanupTime: Long) {
+ persistentRdds.clearOldValues(cleanupTime)
+ }
}
/**
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4034af610c..d2193ae72b 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -19,6 +19,7 @@ import spark.util.AkkaUtils
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
+ val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
val closureSerializer: Serializer,
@@ -58,11 +59,12 @@ object SparkEnv extends Logging {
}
def createFromSystemProperties(
+ executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
- isLocal: Boolean
- ) : SparkEnv = {
+ isLocal: Boolean): SparkEnv = {
+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
@@ -86,7 +88,7 @@ object SparkEnv extends Logging {
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(
actorSystem, isDriver, isLocal, driverIp, driverPort)
- val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
@@ -122,6 +124,7 @@ object SparkEnv extends Logging {
}
new SparkEnv(
+ executorId,
actorSystem,
serializer,
closureSerializer,
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ae77264372..1e58d01273 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,7 +1,7 @@
package spark
import java.io._
-import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI}
+import java.net._
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
import scala.io.Source
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.Some
/**
* Various utility methods used by Spark.
@@ -431,4 +432,18 @@ private object Utils extends Logging {
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
}
+
+ /**
+ * Try to find a free port to bind to on the local host. This should ideally never be needed,
+ * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
+ * don't let users bind to port 0 and then figure out which free port they actually bound to.
+ * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
+ * necessarily guaranteed to work, but it's the best we can do.
+ */
+ def findFreePort(): Int = {
+ val socket = new ServerSocket(0)
+ val portBound = socket.getLocalPort
+ socket.close()
+ portBound
+ }
}
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index b3698ffa44..4c95c989b5 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
+trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T]
@@ -82,10 +82,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
+ * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
*/
- def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
+ private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
new file mode 100644
index 0000000000..68b6fd6622
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
@@ -0,0 +1,20 @@
+package spark.api.java;
+
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaRDDLike;
+import spark.api.java.function.PairFlatMapFunction;
+
+import java.io.Serializable;
+
+/**
+ * Workaround for SPARK-668.
+ */
+class PairFlatMapWorkaround<T> implements Serializable {
+ /**
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
+ */
+ public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
+ return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index ae083efc8d..2836574ecb 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -9,6 +9,12 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
@@ -32,18 +38,15 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
masterActor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
- /* Start the Workers */
+ /* Start the Slaves */
for (workerNum <- 1 to numWorkers) {
- /* We can pretend to test distributed stuff by giving the workers distinct hostnames.
- All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
- sufficiently distinctive. */
- val workerIpAddress = "127.100.0." + (workerNum % 256)
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + workerNum, workerIpAddress, 0)
+ AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
workerActorSystems += actorSystem
- workerActors += actorSystem.actorOf(
- Props(new Worker(workerIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
- name = "Worker")
+ val actor = actorSystem.actorOf(
+ Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
+ name = "Worker")
+ workerActors += actor
}
return masterUrl
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 3347207c6d..bc53b70015 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -97,10 +97,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec)
// Only retry certain number of times so we don't go into an infinite loop.
- if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+ if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
schedule()
} else {
- val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+ val e = new SparkException("Job %s with ID %s failed %d times.".format(
jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
logError(e.getMessage, e)
throw e
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a01774f511 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy._
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone master.
+ */
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 0d1fe2a6b4..f5ff267d44 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -65,9 +65,9 @@ private[spark] class ExecutorRunner(
}
}
- /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+ /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
- case "{{SLAVEID}}" => workerId
+ case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => hostname
case "{{CORES}}" => cores.toString
case other => other
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..ef81f072a3 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
+/**
+ * Web UI server for the standalone worker.
+ */
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- implicit val timeout = Timeout(1 seconds)
+ implicit val timeout = Timeout(10 seconds)
val handler = {
get {
@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR)
}
}
-
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 28d9d40d43..bd21ba719a 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -30,7 +30,7 @@ private[spark] class Executor extends Logging {
initLogging()
- def initialize(slaveHostname: String, properties: Seq[(String, String)]) {
+ def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
@@ -64,7 +64,7 @@ private[spark] class Executor extends Logging {
)
// Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
+ env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index eeab3959c6..1ef88075ad 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -29,9 +29,10 @@ private[spark] class MesosExecutorBackend(executor: Executor)
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
+ logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(slaveInfo.getHostname, properties)
+ executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f80f1b5274..e45288ff53 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -8,16 +8,16 @@ import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterSlaveFailed
-import spark.scheduler.cluster.RegisterSlave
+import spark.scheduler.cluster.RegisterExecutorFailed
+import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String,
- workerId: String,
+ executorId: String,
hostname: String,
cores: Int)
extends Actor
@@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
try {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorFor(driverUrl)
- driver ! RegisterSlave(workerId, hostname, cores)
+ driver ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(driver) // Doesn't work with remote actors, but useful for testing
} catch {
@@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend(
}
override def receive = {
- case RegisteredSlave(sparkProperties) =>
+ case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
- executor.initialize(hostname, sparkProperties)
+ executor.initialize(executorId, hostname, sparkProperties)
- case RegisterSlaveFailed(message) =>
+ case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
@@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- driver ! StatusUpdate(workerId, taskId, state, data)
+ driver ! StatusUpdate(executorId, taskId, state, data)
}
}
private[spark] object StandaloneExecutorBackend {
- def run(driverUrl: String, workerId: String, hostname: String, cores: Int) {
+ def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(new Executor, driverUrl, workerId, hostname, cores)),
+ Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
if (args.length != 4) {
- System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <workerId> <hostname> <cores>")
+ System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b320be8863..bd541d4207 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -35,12 +35,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
eventQueue.put(CompletionEvent(task, reason, result, accumUpdates))
}
- // Called by TaskScheduler when a host fails.
- override def hostLost(host: String) {
- eventQueue.put(HostLost(host))
+ // Called by TaskScheduler when an executor fails.
+ override def executorLost(execId: String) {
+ eventQueue.put(ExecutorLost(execId))
}
- // Called by TaskScheduler to cancel an entier TaskSet due to repeated failures.
+ // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
override def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
}
@@ -54,8 +54,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// resubmit failed stages
val POLL_TIMEOUT = 10L
- private val lock = new Object // Used for access to the entire DAGScheduler
-
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
val nextRunId = new AtomicInteger(0)
@@ -74,7 +72,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// For tracking failed nodes, we use the MapOutputTracker's generation number, which is
// sent with every task. When we detect a node failing, we note the current generation number
- // and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
+ // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
// results.
// TODO: Garbage collect information about failure generations when we know there are no more
// stray messages to detect.
@@ -110,7 +108,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
def clearCacheLocs() {
- cacheLocs.clear
+ cacheLocs.clear()
}
/**
@@ -273,8 +271,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
submitStage(finalStage)
}
- case HostLost(host) =>
- handleHostLost(host)
+ case ExecutorLost(execId) =>
+ handleExecutorLost(execId)
case completion: CompletionEvent =>
handleTaskCompletion(completion)
@@ -337,9 +335,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val rdd = job.finalStage.rdd
val split = rdd.splits(job.partitions(0))
val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
- val result = job.func(taskContext, rdd.iterator(split, taskContext))
- taskContext.executeOnCompleteCallbacks()
- job.listener.taskSucceeded(0, result)
+ try {
+ val result = job.func(taskContext, rdd.iterator(split, taskContext))
+ job.listener.taskSucceeded(0, result)
+ } finally {
+ taskContext.executeOnCompleteCallbacks()
+ }
} catch {
case e: Exception =>
job.listener.jobFailed(e)
@@ -435,10 +436,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case smt: ShuffleMapTask =>
val stage = idToStage(smt.stageId)
val status = event.result.asInstanceOf[MapStatus]
- val host = status.address.ip
- logInfo("ShuffleMapTask finished with host " + host)
- if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
- logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
+ val execId = status.location.executorId
+ logDebug("ShuffleMapTask finished on " + execId)
+ if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
+ logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partition, status)
}
@@ -510,9 +511,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
- // TODO: mark the host as failed only if there were lots of fetch failures on it
+ // TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleHostLost(bmAddress.ip, Some(task.generation))
+ handleExecutorLost(bmAddress.executorId, Some(task.generation))
}
case other =>
@@ -522,21 +523,21 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
/**
- * Responds to a host being lost. This is called inside the event loop so it assumes that it can
- * modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
+ * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
+ * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* Optionally the generation during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
+ def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
- if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
- failedGeneration(host) = currentGeneration
- logInfo("Host lost: " + host + " (generation " + currentGeneration + ")")
- env.blockManager.master.notifyADeadHost(host)
+ if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
+ failedGeneration(execId) = currentGeneration
+ logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
+ env.blockManager.master.removeExecutor(execId)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
- stage.removeOutputsOnHost(host)
+ stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
}
@@ -545,7 +546,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
clearCacheLocs()
} else {
- logDebug("Additional host lost message for " + host +
+ logDebug("Additional executor lost message for " + execId +
"(generation " + currentGeneration + ")")
}
}
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 3422a21d9d..b34fa78c07 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -28,7 +28,7 @@ private[spark] case class CompletionEvent(
accumUpdates: Map[Long, Any])
extends DAGSchedulerEvent
-private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
+private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
index fae643f3a8..203abb917b 100644
--- a/core/src/main/scala/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/spark/scheduler/MapStatus.scala
@@ -8,19 +8,19 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable}
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte])
+private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
extends Externalizable {
def this() = this(null, null) // For deserialization only
def writeExternal(out: ObjectOutput) {
- address.writeExternal(out)
+ location.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
def readExternal(in: ObjectInput) {
- address = BlockManagerId(in)
+ location = BlockManagerId(in)
compressedSizes = new Array[Byte](in.readInt())
in.readFully(compressedSizes)
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 19f5328eee..83641a2a84 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -81,7 +81,7 @@ private[spark] class ShuffleMapTask(
with Externalizable
with Logging {
- def this() = this(0, null, null, 0, null)
+ protected def this() = this(0, null, null, 0, null)
var split = if (rdd == null) {
null
@@ -117,34 +117,34 @@ private[spark] class ShuffleMapTask(
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
- val partitioner = dep.partitioner
val taskContext = new TaskContext(stageId, partition, attemptId)
+ try {
+ // Partition the map output.
+ val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
+ for (elem <- rdd.iterator(split, taskContext)) {
+ val pair = elem.asInstanceOf[(Any, Any)]
+ val bucketId = dep.partitioner.getPartition(pair._1)
+ buckets(bucketId) += pair
+ }
+ val bucketIterators = buckets.map(_.iterator)
- // Partition the map output.
- val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
- for (elem <- rdd.iterator(split, taskContext)) {
- val pair = elem.asInstanceOf[(Any, Any)]
- val bucketId = partitioner.getPartition(pair._1)
- buckets(bucketId) += pair
- }
- val bucketIterators = buckets.map(_.iterator)
+ val compressedSizes = new Array[Byte](numOutputSplits)
- val compressedSizes = new Array[Byte](numOutputSplits)
+ val blockManager = SparkEnv.get.blockManager
+ for (i <- 0 until numOutputSplits) {
+ val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
+ // Get a Scala iterator from Java map
+ val iter: Iterator[(Any, Any)] = bucketIterators(i)
+ val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
+ compressedSizes(i) = MapOutputTracker.compressSize(size)
+ }
- val blockManager = SparkEnv.get.blockManager
- for (i <- 0 until numOutputSplits) {
- val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
- // Get a Scala iterator from Java map
- val iter: Iterator[(Any, Any)] = bucketIterators(i)
- val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
- compressedSizes(i) = MapOutputTracker.compressSize(size)
+ return new MapStatus(blockManager.blockManagerId, compressedSizes)
+ } finally {
+ // Execute the callbacks on task completion.
+ taskContext.executeOnCompleteCallbacks()
}
-
- // Execute the callbacks on task completion.
- taskContext.executeOnCompleteCallbacks()
-
- return new MapStatus(blockManager.blockManagerId, compressedSizes)
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 4846b66729..e9419728e3 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -51,18 +51,18 @@ private[spark] class Stage(
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.address == bmAddress)
+ val newList = prevList.filterNot(_.location == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
numAvailableOutputs -= 1
}
}
- def removeOutputsOnHost(host: String) {
+ def removeOutputsOnExecutor(execId: String) {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.address.ip == host)
+ val newList = prevList.filterNot(_.location.executorId == execId)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
@@ -70,7 +70,8 @@ private[spark] class Stage(
}
}
if (becameUnavailable) {
- logInfo("%s is now unavailable on %s (%d/%d, %s)".format(this, host, numAvailableOutputs, numPartitions, isAvailable))
+ logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+ this, execId, numAvailableOutputs, numPartitions, isAvailable))
}
}
@@ -82,7 +83,7 @@ private[spark] class Stage(
def origin: String = rdd.origin
- override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]"
+ override def toString = "Stage " + id
override def hashCode(): Int = id
}
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index fa4de15d0d..9fcef86e46 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -12,7 +12,7 @@ private[spark] trait TaskSchedulerListener {
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit
// A node was lost from the cluster.
- def hostLost(host: String): Unit
+ def executorLost(execId: String): Unit
// The TaskScheduler wants to abort an entire task set.
def taskSetFailed(taskSet: TaskSet, reason: String): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index a639b72795..0b4177805b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
- val taskIdToSlaveId = new HashMap[Long, String]
+ val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
- // Which hosts in the cluster are alive (contains hostnames)
- val hostsAlive = new HashSet[String]
+ // Which executor IDs we have executors on
+ val activeExecutorIds = new HashSet[String]
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
+ // The set of executors we have on each host; this is used to compute hostsAlive, which
+ // in turn is used to decide when we can attain data locality on a given host
+ val executorsByHost = new HashMap[String, HashSet[String]]
- val slaveIdToHost = new HashMap[String, String]
+ val executorIdToHost = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets -= manager.taskSet.id
activeTaskSetsQueue -= manager
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id)
+ taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id)
}
}
@@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
- slaveIdToHost(o.slaveId) = o.hostname
- hostsAlive += o.hostname
+ executorIdToHost(o.executorId) = o.hostname
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
@@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
do {
launchedTask = false
for (i <- 0 until offers.size) {
- val sid = offers(i).slaveId
+ val execId = offers(i).executorId
val host = offers(i).hostname
- manager.slaveOffer(sid, host, availableCpus(i)) match {
+ manager.slaveOffer(execId, host, availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToSlaveId(tid) = sid
- slaveIdsWithExecutors += sid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ if (!executorsByHost.contains(host)) {
+ executorsByHost(host) = new HashSet()
+ }
+ executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true
@@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var taskSetToUpdate: Option[TaskSetManager] = None
- var failedHost: Option[String] = None
+ var failedExecutor: Option[String] = None
var taskFailed = false
synchronized {
try {
- if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- val slaveId = taskIdToSlaveId(tid)
- val host = slaveIdToHost(slaveId)
- if (hostsAlive.contains(host)) {
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
+ if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+ // We lost this entire executor, so remember that it's gone
+ val execId = taskIdToExecutorId(tid)
+ if (activeExecutorIds.contains(execId)) {
+ removeExecutor(execId)
+ failedExecutor = Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
if (activeTaskSets.contains(taskSetId)) {
- //activeTaskSets(taskSetId).statusUpdate(status)
taskSetToUpdate = Some(activeTaskSets(taskSetId))
}
if (TaskState.isFinished(state)) {
@@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (taskSetTaskIds.contains(taskSetId)) {
taskSetTaskIds(taskSetId) -= tid
}
- taskIdToSlaveId.remove(tid)
+ taskIdToExecutorId.remove(tid)
}
if (state == TaskState.FAILED) {
taskFailed = true
@@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case e: Exception => logError("Exception in statusUpdate", e)
}
}
- // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
+ // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
if (taskSetToUpdate != None) {
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@@ -249,32 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def slaveLost(slaveId: String, reason: ExecutorLossReason) {
- var failedHost: Option[String] = None
+ def executorLost(executorId: String, reason: ExecutorLossReason) {
+ var failedExecutor: Option[String] = None
synchronized {
- slaveIdToHost.get(slaveId) match {
- case Some(host) =>
- if (hostsAlive.contains(host)) {
- logError("Lost an executor on " + host + ": " + reason)
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
- } else {
- // We may get multiple slaveLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor on " + host + " (already removed): " + reason)
- }
- case None =>
- // We were told about a slave being lost before we could even allocate work to it
- logError("Lost slave " + slaveId + " (no work assigned yet)")
+ if (activeExecutorIds.contains(executorId)) {
+ val host = executorIdToHost(executorId)
+ logError("Lost executor %s on %s: %s".format(executorId, host, reason))
+ removeExecutor(executorId)
+ failedExecutor = Some(executorId)
+ } else {
+ // We may get multiple executorLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
}
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ // Call listener.executorLost without holding the lock on this to prevent deadlock
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
+
+ /** Get a list of hosts that currently have executors */
+ def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet
+
+ /** Remove an executor from all our data structures and mark it as lost */
+ private def removeExecutor(executorId: String) {
+ activeExecutorIds -= executorId
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
+ execs -= executorId
+ if (execs.isEmpty) {
+ executorsByHost -= host
+ }
+ executorIdToHost -= executorId
+ activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
deleted file mode 100644
index 96ebaa4601..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package spark.scheduler.cluster
-
-private[spark]
-class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 866beb6d01..9760d23072 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- val executorIdToWorkerId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@@ -38,7 +37,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
@@ -48,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
}
override def stop() {
- stopping = true;
+ stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
@@ -67,24 +66,17 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- override def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int) {
- executorIdToWorkerId += fullId -> workerId
+ override def executorAdded(executorId: String, workerId: String, host: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
- fullId, host, cores, Utils.memoryMegabytesToString(memory)))
+ executorId, host, cores, Utils.memoryMegabytesToString(memory)))
}
- override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
+ override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
}
- logInfo("Executor %s removed: %s".format(fullId, message))
- executorIdToWorkerId.get(fullId) match {
- case Some(workerId) =>
- executorIdToWorkerId.remove(fullId)
- scheduler.slaveLost(workerId, reason)
- case None =>
- logInfo("No worker ID known for executor %s".format(fullId))
- }
+ logInfo("Executor %s removed: %s".format(executorId, message))
+ scheduler.executorLost(executorId, reason)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index bea9dc4f23..da7dcf4b6b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -11,24 +11,26 @@ private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
-case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
private[spark]
-case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
// Executors to driver
private[spark]
-case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class RegisterExecutor(executorId: String, host: String, cores: Int)
+ extends StandaloneClusterMessage
private[spark]
-case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d742a7b2bf..082022be1c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -24,12 +24,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
var totalCoreCount = new AtomicInteger(0)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
- val slaveActor = new HashMap[String, ActorRef]
- val slaveAddress = new HashMap[String, Address]
- val slaveHost = new HashMap[String, String]
+ val executorActor = new HashMap[String, ActorRef]
+ val executorAddress = new HashMap[String, Address]
+ val executorHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
- val actorToSlaveId = new HashMap[ActorRef, String]
- val addressToSlaveId = new HashMap[Address, String]
+ val actorToExecutorId = new HashMap[ActorRef, String]
+ val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -37,28 +37,28 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
def receive = {
- case RegisterSlave(workerId, host, cores) =>
- if (slaveActor.contains(workerId)) {
- sender ! RegisterSlaveFailed("Duplicate slave ID: " + workerId)
+ case RegisterExecutor(executorId, host, cores) =>
+ if (executorActor.contains(executorId)) {
+ sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
- logInfo("Registered slave: " + sender + " with ID " + workerId)
- sender ! RegisteredSlave(sparkProperties)
+ logInfo("Registered executor: " + sender + " with ID " + executorId)
+ sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
- slaveActor(workerId) = sender
- slaveHost(workerId) = host
- freeCores(workerId) = cores
- slaveAddress(workerId) = sender.path.address
- actorToSlaveId(sender) = workerId
- addressToSlaveId(sender.path.address) = workerId
+ executorActor(executorId) = sender
+ executorHost(executorId) = host
+ freeCores(executorId) = cores
+ executorAddress(executorId) = sender.path.address
+ actorToExecutorId(sender) = executorId
+ addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
}
- case StatusUpdate(workerId, taskId, state, data) =>
+ case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
- freeCores(workerId) += 1
- makeOffers(workerId)
+ freeCores(executorId) += 1
+ makeOffers(executorId)
}
case ReviveOffers =>
@@ -69,47 +69,47 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
- actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
+ actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
- addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
- // Make fake resource offers on all slaves
+ // Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
- // Make fake resource offers on just one slave
- def makeOffers(workerId: String) {
+ // Make fake resource offers on just one executor
+ def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(workerId, slaveHost(workerId), freeCores(workerId)))))
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.slaveId) -= 1
- slaveActor(task.slaveId) ! LaunchTask(task)
+ freeCores(task.executorId) -= 1
+ executorActor(task.executorId) ! LaunchTask(task)
}
}
// Remove a disconnected slave from the cluster
- def removeSlave(workerId: String, reason: String) {
- logInfo("Slave " + workerId + " disconnected, so removing it")
- val numCores = freeCores(workerId)
- actorToSlaveId -= slaveActor(workerId)
- addressToSlaveId -= slaveAddress(workerId)
- slaveActor -= workerId
- slaveHost -= workerId
- freeCores -= workerId
- slaveHost -= workerId
+ def removeExecutor(executorId: String, reason: String) {
+ logInfo("Slave " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ executorHost -= executorId
totalCoreCount.addAndGet(-numCores)
- scheduler.slaveLost(workerId, SlaveLost(reason))
+ scheduler.executorLost(executorId, SlaveLost(reason))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index aa097fd3a2..b41e951be9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,7 +5,7 @@ import spark.util.SerializableBuffer
private[spark] class TaskDescription(
val taskId: Long,
- val slaveId: String,
+ val executorId: String,
val name: String,
_serializedTask: ByteBuffer)
extends Serializable {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index ca84503780..0f975ce1eb 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -4,7 +4,12 @@ package spark.scheduler.cluster
* Information about a running task attempt inside a TaskSet.
*/
private[spark]
-class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
+class TaskInfo(
+ val taskId: Long,
+ val index: Int,
+ val launchTime: Long,
+ val executorId: String,
+ val host: String) {
var finishTime: Long = 0
var failed = false
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index a089b71644..26201ad0dd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -138,10 +138,11 @@ private[spark] class TaskSetManager(
// attempt running on this host, in case the host is slow. In addition, if localOnly is set, the
// task must have a preference for this host (or no preferred locations at all).
def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = {
+ val hostsAlive = sched.hostsAlive
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
val localTask = speculatableTasks.find {
index =>
- val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
+ val locations = tasks(index).preferredLocations.toSet & hostsAlive
val attemptLocs = taskAttempts(index).map(_.host)
(locations.size == 0 || locations.contains(host)) && !attemptLocs.contains(host)
}
@@ -189,7 +190,7 @@ private[spark] class TaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -206,11 +207,11 @@ private[spark] class TaskSetManager(
} else {
"non-preferred, not one of " + task.preferredLocations.mkString(", ")
}
- logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
- taskSet.id, index, taskId, slaveId, host, prefStr))
+ logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
+ taskSet.id, index, taskId, execId, host, prefStr))
// Do various bookkeeping
copiesRunning(index) += 1
- val info = new TaskInfo(taskId, index, time, host)
+ val info = new TaskInfo(taskId, index, time, execId, host)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
if (preferred) {
@@ -224,7 +225,7 @@ private[spark] class TaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
- return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
@@ -356,19 +357,22 @@ private[spark] class TaskSetManager(
sched.taskSetFinished(this)
}
- def hostLost(hostname: String) {
- logInfo("Re-queueing tasks for " + hostname + " from TaskSet " + taskSet.id)
- // If some task has preferred locations only on hostname, put it in the no-prefs list
- // to avoid the wait from delay scheduling
- for (index <- getPendingTasksForHost(hostname)) {
- val newLocs = tasks(index).preferredLocations.toSet & sched.hostsAlive
- if (newLocs.isEmpty) {
- pendingTasksWithNoPrefs += index
+ def executorLost(execId: String, hostname: String) {
+ logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
+ val newHostsAlive = sched.hostsAlive
+ // If some task has preferred locations only on hostname, and there are no more executors there,
+ // put it in the no-prefs list to avoid the wait from delay scheduling
+ if (!newHostsAlive.contains(hostname)) {
+ for (index <- getPendingTasksForHost(hostname)) {
+ val newLocs = tasks(index).preferredLocations.toSet & newHostsAlive
+ if (newLocs.isEmpty) {
+ pendingTasksWithNoPrefs += index
+ }
}
}
- // Re-enqueue any tasks that ran on the failed host if this is a shuffle map stage
+ // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
- for ((tid, info) <- taskInfos if info.host == hostname) {
+ for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (finished(index)) {
finished(index) = false
@@ -382,7 +386,7 @@ private[spark] class TaskSetManager(
}
}
// Also re-enqueue any tasks that were running on the node
- for ((tid, info) <- taskInfos if info.running && info.host == hostname) {
+ for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
taskLost(tid, TaskState.KILLED, null)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 6b919d68b2..3c3afcbb14 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -1,8 +1,8 @@
package spark.scheduler.cluster
/**
- * Represents free resources available on a worker node.
+ * Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
+class WorkerOffer(val executorId: String, val hostname: String, val cores: Int) {
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 2989e31f5e..f3467db86b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -268,7 +268,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
- scheduler.slaveLost(slaveId.getValue, reason)
+ scheduler.executorLost(slaveId.getValue, reason)
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 19cdaaa984..1215d5f5c8 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -30,6 +30,7 @@ extends Exception(message)
private[spark]
class BlockManager(
+ executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
val serializer: Serializer,
@@ -68,8 +69,8 @@ class BlockManager(
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
- val connectionManagerId = connectionManager.id
- val blockManagerId = BlockManagerId(connectionManagerId.host, connectionManagerId.port)
+ val blockManagerId = BlockManagerId(
+ executorId, connectionManager.id.host, connectionManager.id.port)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@@ -90,7 +91,10 @@ class BlockManager(
val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
- @volatile private var shuttingDown = false
+ // Pending reregistration action being executed asynchronously or null if none
+ // is pending. Accesses should synchronize on asyncReregisterLock.
+ var asyncReregisterTask: Future[Unit] = null
+ val asyncReregisterLock = new Object
private def heartBeat() {
if (!master.sendHeartBeat(blockManagerId)) {
@@ -106,8 +110,9 @@ class BlockManager(
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
- this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
+ serializer: Serializer) = {
+ this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -147,6 +152,8 @@ class BlockManager(
/**
* Reregister with the master and report all blocks to it. This will be called by the heart beat
* thread if our heartbeat to the block amnager indicates that we were not registered.
+ *
+ * Note that this method must be called without any BlockInfo locks held.
*/
def reregister() {
// TODO: We might need to rate limit reregistering.
@@ -156,6 +163,32 @@ class BlockManager(
}
/**
+ * Reregister with the master sometime soon.
+ */
+ def asyncReregister() {
+ asyncReregisterLock.synchronized {
+ if (asyncReregisterTask == null) {
+ asyncReregisterTask = Future[Unit] {
+ reregister()
+ asyncReregisterLock.synchronized {
+ asyncReregisterTask = null
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
+ */
+ def waitForAsyncReregister() {
+ val task = asyncReregisterTask
+ if (task != null) {
+ Await.ready(task, Duration.Inf)
+ }
+ }
+
+ /**
* Get storage level of local block. If no info exists for the block, then returns null.
*/
def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
@@ -170,7 +203,7 @@ class BlockManager(
if (needReregister) {
logInfo("Got told to reregister updating block " + blockId)
// Reregistering will report our new block for free.
- reregister()
+ asyncReregister()
}
logDebug("Told master about block " + blockId)
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index abb8b45a1f..f2f1e77d41 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -7,27 +7,32 @@ import java.util.concurrent.ConcurrentHashMap
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the factory method in
- * [[spark.storage.BlockManager$]]. This allows de-duplication of id objects.
+ * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
+ private var executorId_ : String,
private var ip_ : String,
private var port_ : Int
) extends Externalizable {
- private def this() = this(null, 0) // For deserialization only
+ private def this() = this(null, null, 0) // For deserialization only
- def ip = ip_
+ def executorId: String = executorId_
- def port = port_
+ def ip: String = ip_
+
+ def port: Int = port_
override def writeExternal(out: ObjectOutput) {
+ out.writeUTF(executorId_)
out.writeUTF(ip_)
out.writeInt(port_)
}
override def readExternal(in: ObjectInput) {
+ executorId_ = in.readUTF()
ip_ = in.readUTF()
port_ = in.readInt()
}
@@ -35,21 +40,23 @@ private[spark] class BlockManagerId private (
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+ override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, ip, port)
- override def hashCode = ip.hashCode * 41 + port
+ override def hashCode: Int = (executorId.hashCode * 41 + ip.hashCode) * 41 + port
override def equals(that: Any) = that match {
- case id: BlockManagerId => port == id.port && ip == id.ip
- case _ => false
+ case id: BlockManagerId =>
+ executorId == id.executorId && port == id.port && ip == id.ip
+ case _ =>
+ false
}
}
private[spark] object BlockManagerId {
- def apply(ip: String, port: Int) =
- getCachedBlockManagerId(new BlockManagerId(ip, port))
+ def apply(execId: String, ip: String, port: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, ip, port))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 9fd2b454a4..36398095a2 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,6 +1,10 @@
package spark.storage
-import scala.collection.mutable.ArrayBuffer
+import java.io._
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
@@ -19,7 +23,7 @@ private[spark] class BlockManagerMaster(
driverPort: Int)
extends Logging {
- val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+ val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
@@ -40,10 +44,10 @@ private[spark] class BlockManagerMaster(
}
}
- /** Remove a dead host from the driver actor. This is only called on the driver side. */
- def notifyADeadHost(host: String) {
- tell(RemoveHost(host))
- logInfo("Removed " + host + " successfully in notifyADeadHost")
+ /** Remove a dead executor from the driver actor. This is only called on the driver side. */
+ def removeExecutor(execId: String) {
+ tell(RemoveExecutor(execId))
+ logInfo("Removed " + execId + " successfully in removeExecutor")
}
/**
@@ -141,7 +145,7 @@ private[spark] class BlockManagerMaster(
}
var attempts = 0
var lastException: Exception = null
- while (attempts < AKKA_RETRY_ATTEMPS) {
+ while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = driverActor.ask(message)(timeout)
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f4d026da33..f88517f1a3 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -23,9 +23,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private val blockManagerInfo =
new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
- // Mapping from host name to block manager id. We allow multiple block managers
- // on the same host name (ip).
- private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+ // Mapping from executor ID to block manager ID.
+ private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
@@ -68,11 +67,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
case GetMemoryStatus =>
getMemoryStatus
+ case GetStorageStatus =>
+ getStorageStatus
+
case RemoveBlock(blockId) =>
removeBlock(blockId)
- case RemoveHost(host) =>
- removeHost(host)
+ case RemoveExecutor(execId) =>
+ removeExecutor(execId)
sender ! true
case StopBlockManagerMaster =>
@@ -96,16 +98,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
- // Remove the block manager from blockManagerIdByHost. If the list of block
- // managers belonging to the IP is empty, remove the entry from the hash map.
- blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
- managers -= blockManagerId
- if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
- }
+ // Remove the block manager from blockManagerIdByExecutor.
+ blockManagerIdByExecutor -= blockManagerId.executorId
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
- var iterator = info.blocks.keySet.iterator
+ val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)._2
@@ -130,17 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
toRemove.foreach(removeBlockManager)
}
- def removeHost(host: String) {
- logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
- logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
- logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+ def removeExecutor(execId: String) {
+ logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
+ blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
sender ! true
}
def heartBeat(blockManagerId: BlockManagerId) {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ if (blockManagerId.executorId == "<driver>" && !isLocal) {
sender ! true
} else {
sender ! false
@@ -177,24 +173,28 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! res
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " "
+ private def getStorageStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ import collection.JavaConverters._
+ StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
+ }
+ sender ! res
+ }
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- logInfo("Got Register Msg from master node, don't register it")
- } else {
- blockManagerIdByHost.get(blockManagerId.ip) match {
- case Some(managers) =>
- // A block manager of the same host name already exists.
- logInfo("Got another registration for host " + blockManagerId)
- managers += blockManagerId
+ private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ if (id.executorId == "<driver>" && !isLocal) {
+ // Got a register message from the master node; don't register it
+ } else if (!blockManagerInfo.contains(id)) {
+ blockManagerIdByExecutor.get(id.executorId) match {
+ case Some(manager) =>
+ // A block manager of the same host name already exists
+ logError("Got two different block manager registrations on " + id.executorId)
+ System.exit(1)
case None =>
- blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+ blockManagerIdByExecutor(id.executorId) = id
}
-
- blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
- blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+ blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
+ id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
sender ! true
}
@@ -206,11 +206,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
memSize: Long,
diskSize: Long) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " " + blockId + " "
-
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ if (blockManagerId.executorId == "<driver>" && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
@@ -342,8 +339,8 @@ object BlockManagerMasterActor {
_lastSeenMs = System.currentTimeMillis()
}
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long,
+ diskSize: Long) {
updateLastSeenMs()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 30483b0b37..1494f90103 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
+case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
@@ -98,3 +98,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster
private[spark]
case object ExpireDeadHosts extends ToBlockManagerMaster
+
+private[spark]
+case object GetStorageStatus extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
new file mode 100644
index 0000000000..eda320fa47
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -0,0 +1,85 @@
+package spark.storage
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
+import cc.spray.directives._
+import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.Directives
+import scala.collection.mutable.ArrayBuffer
+import spark.{Logging, SparkContext}
+import spark.util.AkkaUtils
+import spark.Utils
+
+
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
+private[spark]
+class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
+ extends Directives with Logging {
+
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+ implicit val timeout = Timeout(10 seconds)
+
+ /** Start a HTTP server to run the Web interface */
+ def start() {
+ try {
+ val port = if (System.getProperty("spark.ui.port") != null) {
+ System.getProperty("spark.ui.port").toInt
+ } else {
+ // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
+ // random port it bound to, so we have to try to find a local one by creating a socket.
+ Utils.findFreePort()
+ }
+ AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+ logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create BlockManager WebUI", e)
+ System.exit(1)
+ }
+ }
+
+ val handler = {
+ get {
+ path("") {
+ completeWith {
+ // Request the current storage status from the Master
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ // Calculate macro-level statistics
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ spark.storage.html.index.
+ render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
+ }
+ }
+ } ~
+ path("rdd") {
+ parameter("id") { id =>
+ completeWith {
+ val future = blockManagerMaster ? GetStorageStatus
+ future.map { status =>
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
+ }
+ }
+ }
+ } ~
+ pathPrefix("static") {
+ getFromResourceDirectory(STATIC_RESOURCE_DIR)
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index d1d1c61c1c..3b5a77ab22 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -80,6 +80,14 @@ class StorageLevel private(
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
override def hashCode(): Int = toInt * 41 + replication
+ def description : String = {
+ var result = ""
+ result += (if (useDisk) "Disk " else "")
+ result += (if (useMemory) "Memory " else "")
+ result += (if (deserialized) "Deserialized " else "Serialized")
+ result += "%sx Replicated".format(replication)
+ result
+ }
}
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
new file mode 100644
index 0000000000..a10e3a95c6
--- /dev/null
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -0,0 +1,78 @@
+package spark.storage
+
+import spark.SparkContext
+import BlockManagerMasterActor.BlockStatus
+
+private[spark]
+case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
+ blocks: Map[String, BlockStatus]) {
+
+ def memUsed(blockPrefix: String = "") = {
+ blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
+ reduceOption(_+_).getOrElse(0l)
+ }
+
+ def diskUsed(blockPrefix: String = "") = {
+ blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize).
+ reduceOption(_+_).getOrElse(0l)
+ }
+
+ def memRemaining : Long = maxMem - memUsed()
+
+}
+
+case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
+ numPartitions: Int, memSize: Long, diskSize: Long)
+
+
+/* Helper methods for storage-related objects */
+private[spark]
+object StorageUtils {
+
+ /* Given the current storage status of the BlockManager, returns information for each RDD */
+ def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
+ sc: SparkContext) : Array[RDDInfo] = {
+ rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
+ }
+
+ /* Given a list of BlockStatus objets, returns information for each RDD */
+ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
+ sc: SparkContext) : Array[RDDInfo] = {
+ // Find all RDD Blocks (ignore broadcast variables)
+ val rddBlocks = infos.filterKeys(_.startsWith("rdd"))
+
+ // Group by rddId, ignore the partition name
+ val groupedRddBlocks = infos.groupBy { case(k, v) =>
+ k.substring(0,k.lastIndexOf('_'))
+ }.mapValues(_.values.toArray)
+
+ // For each RDD, generate an RDDInfo object
+ groupedRddBlocks.map { case(rddKey, rddBlocks) =>
+
+ // Add up memory and disk sizes
+ val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
+ val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
+
+ // Find the id of the RDD, e.g. rdd_1 => 1
+ val rddId = rddKey.split("_").last.toInt
+ // Get the friendly name for the rdd, if available.
+ val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
+ val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
+
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
+ }.toArray
+ }
+
+ /* Removes all BlockStatus object that are not part of a block prefix */
+ def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
+ prefix: String) : Array[StorageStatus] = {
+
+ storageStatusList.map { status =>
+ val newBlocks = status.blocks.filterKeys(_.startsWith(prefix))
+ //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _)
+ StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 0b8f6d4303..a70d1c8e78 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -78,7 +78,8 @@ private[spark] object ThreadingTest {
val driverIp: String = System.getProperty("spark.driver.host", "localhost")
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, driverIp, driverPort)
- val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
+ val blockManager = new BlockManager(
+ "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index fbd0ff46bf..e0fdeffbc4 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,6 +1,6 @@
package spark.util
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.util.duration._
import akka.pattern.ask
@@ -52,21 +52,22 @@ private[spark] object AkkaUtils {
/**
* Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
- * handle requests. Throws a SparkException if this fails.
+ * handle requests. Returns the bound port or throws a SparkException on failure.
*/
- def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
+ def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route,
+ name: String = "HttpServer"): ActorRef = {
val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
val server = actorSystem.actorOf(
- Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
+ Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = name)
actorSystem.registerOnTermination { ioWorker.stop() }
val timeout = 3.seconds
val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>
- return
+ return server
case other: Any =>
throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 139e21d09e..51fb440108 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
import spark.Logging
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = MetadataCleaner.getDelaySeconds
@@ -14,18 +17,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
val task = new TimerTask {
def run() {
try {
- if (delaySeconds > 0) {
- cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran metadata cleaner for " + name)
- }
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
- if (periodSeconds > 0) {
- logInfo(
+ if (delaySeconds > 0) {
+ logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index bb7c5c01c8..188f8910da 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -63,9 +63,9 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
- override def size(): Int = internalMap.size()
+ override def size: Int = internalMap.size
- override def foreach[U](f: ((A, B)) => U): Unit = {
+ override def foreach[U](f: ((A, B)) => U) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
diff --git a/core/src/main/twirl/spark/deploy/common/layout.scala.html b/core/src/main/twirl/spark/common/layout.scala.html
index b9192060aa..b9192060aa 100644
--- a/core/src/main/twirl/spark/deploy/common/layout.scala.html
+++ b/core/src/main/twirl/spark/common/layout.scala.html
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 18c32e5a1f..285645c389 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -2,7 +2,7 @@
@import spark.deploy.master._
@import spark.Utils
-@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
+@spark.common.html.layout(title = "Spark Master on " + state.uri) {
<!-- Cluster Details -->
<div class="row">
diff --git a/core/src/main/twirl/spark/deploy/master/job_details.scala.html b/core/src/main/twirl/spark/deploy/master/job_details.scala.html
index dcf41c28f2..d02a51b214 100644
--- a/core/src/main/twirl/spark/deploy/master/job_details.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_details.scala.html
@@ -1,6 +1,6 @@
@(job: spark.deploy.master.JobInfo)
-@spark.deploy.common.html.layout(title = "Job Details") {
+@spark.common.html.layout(title = "Job Details") {
<!-- Job Details -->
<div class="row">
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index b247307dab..1d703dae58 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,8 +1,7 @@
@(worker: spark.deploy.WorkerState)
-
@import spark.Utils
-@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
+@spark.common.html.layout(title = "Spark Worker on " + worker.uri) {
<!-- Worker Details -->
<div class="row">
diff --git a/core/src/main/twirl/spark/storage/index.scala.html b/core/src/main/twirl/spark/storage/index.scala.html
new file mode 100644
index 0000000000..2b337f6133
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/index.scala.html
@@ -0,0 +1,40 @@
+@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: Array[spark.storage.RDDInfo], storageStatusList: Array[spark.storage.StorageStatus])
+@import spark.Utils
+
+@spark.common.html.layout(title = "Storage Dashboard") {
+
+ <!-- High-Level Information -->
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Memory:</strong>
+ @{Utils.memoryBytesToString(maxMem - remainingMem)} Used
+ (@{Utils.memoryBytesToString(remainingMem)} Available) </li>
+ <li><strong>Disk:</strong> @{Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+ </ul>
+ </div>
+ </div>
+
+ <hr/>
+
+ <!-- RDD Summary -->
+ <div class="row">
+ <div class="span12">
+ <h3> RDD Summary </h3>
+ <br/>
+ @rdd_table(rdds)
+ </div>
+ </div>
+
+ <hr/>
+
+ <!-- Worker Summary -->
+ <div class="row">
+ <div class="span12">
+ <h3> Worker Summary </h3>
+ <br/>
+ @worker_table(storageStatusList)
+ </div>
+ </div>
+
+} \ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html
new file mode 100644
index 0000000000..ac7f8c981f
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/rdd.scala.html
@@ -0,0 +1,77 @@
+@(rddInfo: spark.storage.RDDInfo, storageStatusList: Array[spark.storage.StorageStatus])
+@import spark.Utils
+
+@spark.common.html.layout(title = "RDD Info ") {
+
+ <!-- High-Level Information -->
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li>
+ <strong>Storage Level:</strong>
+ @(rddInfo.storageLevel.description)
+ <li>
+ <strong>Partitions:</strong>
+ @(rddInfo.numPartitions)
+ </li>
+ <li>
+ <strong>Memory Size:</strong>
+ @{Utils.memoryBytesToString(rddInfo.memSize)}
+ </li>
+ <li>
+ <strong>Disk Size:</strong>
+ @{Utils.memoryBytesToString(rddInfo.diskSize)}
+ </li>
+ </ul>
+ </div>
+ </div>
+
+ <hr/>
+
+ <!-- RDD Summary -->
+ <div class="row">
+ <div class="span12">
+ <h3> RDD Summary </h3>
+ <br/>
+
+
+ <!-- Block Table Summary -->
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Block Name</th>
+ <th>Storage Level</th>
+ <th>Size in Memory</th>
+ <th>Size on Disk</th>
+ </tr>
+ </thead>
+ <tbody>
+ @storageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { case (k,v) =>
+ <tr>
+ <td>@k</td>
+ <td>
+ @(v.storageLevel.description)
+ </td>
+ <td>@{Utils.memoryBytesToString(v.memSize)}</td>
+ <td>@{Utils.memoryBytesToString(v.diskSize)}</td>
+ </tr>
+ }
+ </tbody>
+ </table>
+
+
+ </div>
+ </div>
+
+ <hr/>
+
+ <!-- Worker Table -->
+ <div class="row">
+ <div class="span12">
+ <h3> Worker Summary </h3>
+ <br/>
+ @worker_table(storageStatusList, "rdd_" + rddInfo.id )
+ </div>
+ </div>
+
+} \ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html
new file mode 100644
index 0000000000..af801cf229
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html
@@ -0,0 +1,30 @@
+@(rdds: Array[spark.storage.RDDInfo])
+@import spark.Utils
+
+<table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>RDD Name</th>
+ <th>Storage Level</th>
+ <th>Partitions</th>
+ <th>Size in Memory</th>
+ <th>Size on Disk</th>
+ </tr>
+ </thead>
+ <tbody>
+ @for(rdd <- rdds) {
+ <tr>
+ <td>
+ <a href="rdd?id=@(rdd.id)">
+ @rdd.name
+ </a>
+ </td>
+ <td>@(rdd.storageLevel.description)
+ </td>
+ <td>@rdd.numPartitions</td>
+ <td>@{Utils.memoryBytesToString(rdd.memSize)}</td>
+ <td>@{Utils.memoryBytesToString(rdd.diskSize)}</td>
+ </tr>
+ }
+ </tbody>
+</table> \ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/worker_table.scala.html b/core/src/main/twirl/spark/storage/worker_table.scala.html
new file mode 100644
index 0000000000..d54b8de4cc
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/worker_table.scala.html
@@ -0,0 +1,24 @@
+@(workersStatusList: Array[spark.storage.StorageStatus], prefix: String = "")
+@import spark.Utils
+
+<table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Host</th>
+ <th>Memory Usage</th>
+ <th>Disk Usage</th>
+ </tr>
+ </thead>
+ <tbody>
+ @for(status <- workersStatusList) {
+ <tr>
+ <td>@(status.blockManagerId.ip + ":" + status.blockManagerId.port)</td>
+ <td>
+ @(Utils.memoryBytesToString(status.memUsed(prefix)))
+ (@(Utils.memoryBytesToString(status.memRemaining)) Total Available)
+ </td>
+ <td>@(Utils.memoryBytesToString(status.diskUsed(prefix)))</td>
+ </tr>
+ }
+ </tbody>
+</table> \ No newline at end of file
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 70a7c8bc2f..342610e1dd 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -13,7 +13,8 @@ class DriverSuite extends FunSuite with Timeouts {
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
failAfter(10 seconds) {
- Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME")))
+ Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
+ new File(System.getenv("SPARK_HOME")))
}
}
}
@@ -28,4 +29,4 @@ object DriverWithoutCleanup {
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
sc.parallelize(1 to 100, 4).count()
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 42ce6f3c74..934e4c2f67 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -356,6 +356,34 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+ new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+ return Collections.singletonList(item.swap());
+ }
+ });
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+ return item.swap();
+ }
+ }).collect();
+ }
+
+ @Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 718107d2b5..f4e7ec39fe 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -43,13 +43,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
- assert(statuses.toSeq === Seq((BlockManagerId("hostA", 1000), size1000),
- (BlockManagerId("hostB", 1000), size10000)))
+ assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
+ (BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
}
@@ -61,47 +61,52 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
- tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
- tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
- // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // The remaining reduce task might try to grab the output despite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
- val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("test", "localhost", 0)
- System.setProperty("spark.driver.port", boundPort.toString)
- val masterTracker = new MapOutputTracker(actorSystem, true)
- val slaveTracker = new MapOutputTracker(actorSystem, false)
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ try {
+ System.clearProperty("spark.driver.host") // In case some previous test had set it
+ val (actorSystem, boundPort) =
+ AkkaUtils.createActorSystem("test", "localhost", 0)
+ System.setProperty("spark.driver.port", boundPort.toString)
+ val masterTracker = new MapOutputTracker(actorSystem, true)
+ val slaveTracker = new MapOutputTracker(actorSystem, false)
+ masterTracker.registerShuffle(10, 1)
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("hostA", 1000), Array(compressedSize1000)))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("hostA", 1000), size1000)))
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ masterTracker.registerMapOutput(10, 0, new MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
- masterTracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ masterTracker.incrementGeneration()
+ slaveTracker.updateGeneration(masterTracker.getGeneration)
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- // failure should be cached
- intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ // failure should be cached
+ intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+ } finally {
+ System.clearProperty("spark.driver.port")
+ }
}
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index a1aeb12f25..2d177bbf67 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -86,9 +86,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("BlockManagerId object caching") {
- val id1 = BlockManagerId("XXX", 1)
- val id2 = BlockManagerId("XXX", 1) // this should return the same object as id1
- val id3 = BlockManagerId("XXX", 2) // this should return a different object
+ val id1 = BlockManagerId("e1", "XXX", 1)
+ val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
+ val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")
@@ -103,7 +103,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -133,8 +133,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
- store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -149,7 +149,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing block") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -198,7 +198,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -206,7 +206,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
- master.notifyADeadHost(store.blockManagerId.ip)
+ master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
@@ -214,25 +214,63 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-
assert(master.getLocations("a1").size > 0, "master was not told about a1")
- master.notifyADeadHost(store.blockManagerId.ip)
+ master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+ store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
+ test("reregistration doesn't dead lock") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = List(new Array[Byte](400))
+
+ // try many times to trigger any deadlocks
+ for (i <- 1 to 100) {
+ master.removeExecutor(store.blockManagerId.executorId)
+ val t1 = new Thread {
+ override def run() {
+ store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+ }
+ }
+ val t2 = new Thread {
+ override def run() {
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ }
+ }
+ val t3 = new Thread {
+ override def run() {
+ store invokePrivate heartBeat()
+ }
+ }
+
+ t1.start()
+ t2.start()
+ t3.start()
+ t1.join()
+ t2.join()
+ t3.join()
+
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
+ store.waitForAsyncReregister()
+ }
+ }
+
test("in-memory LRU storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -251,7 +289,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -270,14 +308,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
- // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
@@ -289,7 +327,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -312,7 +350,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -325,7 +363,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -340,7 +378,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -355,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -370,7 +408,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -385,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -410,7 +448,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -434,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -480,7 +518,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(actorSystem, master, serializer, 500)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -491,49 +529,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
diff --git a/sbt/sbt b/sbt/sbt
index a3055c13c1..8f426d18e8 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
-java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"