aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-15 10:59:33 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-15 10:59:33 -0800
commit0913c2297158a12656648b0a4b2023ae587da3b4 (patch)
tree97c9c10ff6bfb213fbb936679d8410acea3ce8d9 /core/src/main/scala/org/apache
parent52144caaa70363ffcc63e1f52db32eb1654c1213 (diff)
parent96e0fb46309698b685c811a65bd8e1a691389994 (diff)
downloadspark-0913c2297158a12656648b0a4b2023ae587da3b4.tar.gz
spark-0913c2297158a12656648b0a4b2023ae587da3b4.tar.bz2
spark-0913c2297158a12656648b0a4b2023ae587da3b4.zip
Merge remote-tracking branch 'upstream/master' into consolidate_schedulers
Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala104
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
6 files changed, 45 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 06bea0c535..d5616c274d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -257,7 +257,6 @@ class SparkContext(
taskScheduler.start()
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
- dagScheduler.start()
ui.start()
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 47e958b5e6..53f77a38f5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* sources in HBase, or S3).
*
* @param sc The SparkContext to associate the RDD with.
- * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
+ * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d0b21e896e..42bb3884c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,9 +19,10 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.util.Properties
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
+import akka.actor._
+import akka.util.duration._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import org.apache.spark._
@@ -65,12 +66,12 @@ class DAGScheduler(
// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
- eventQueue.put(BeginEvent(task, taskInfo))
+ eventProcessActor ! BeginEvent(task, taskInfo)
}
// Called to report that a task has completed and results are being fetched remotely.
def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
- eventQueue.put(GettingResultEvent(task, taskInfo))
+ eventProcessActor ! GettingResultEvent(task, taskInfo)
}
// Called by TaskScheduler to report task completions or failures.
@@ -81,23 +82,23 @@ class DAGScheduler(
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
- eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
+ eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}
// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
- eventQueue.put(ExecutorLost(execId))
+ eventProcessActor ! ExecutorLost(execId)
}
// Called by TaskScheduler when a host is added
def executorGained(execId: String, host: String) {
- eventQueue.put(ExecutorGained(execId, host))
+ eventProcessActor ! ExecutorGained(execId, host)
}
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
def taskSetFailed(taskSet: TaskSet, reason: String) {
- eventQueue.put(TaskSetFailed(taskSet, reason))
+ eventProcessActor ! TaskSetFailed(taskSet, reason)
}
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
@@ -109,7 +110,30 @@ class DAGScheduler(
// resubmit failed stages
val POLL_TIMEOUT = 10L
- private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
+ private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+ override def preStart() {
+ context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+ if (failed.size > 0) {
+ resubmitFailedStages()
+ }
+ }
+ }
+
+ /**
+ * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
+ * events and responds by launching tasks. This runs in a dedicated thread and receives events
+ * via the eventQueue.
+ */
+ def receive = {
+ case event: DAGSchedulerEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+
+ if (!processEvent(event))
+ submitWaitingStages()
+ else
+ context.stop(self)
+ }
+ }))
private[scheduler] val nextJobId = new AtomicInteger(0)
@@ -150,16 +174,6 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
- // Start a thread to run the DAGScheduler event loop
- def start() {
- new Thread("DAGScheduler") {
- setDaemon(true)
- override def run() {
- DAGScheduler.this.run()
- }
- }.start()
- }
-
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
@@ -301,8 +315,7 @@ class DAGScheduler(
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
- eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite,
- waiter, properties))
+ eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
@@ -337,8 +350,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
- eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite,
- listener, properties))
+ eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails
}
@@ -347,19 +359,19 @@ class DAGScheduler(
*/
def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
- eventQueue.put(JobCancelled(jobId))
+ eventProcessActor ! JobCancelled(jobId)
}
def cancelJobGroup(groupId: String) {
logInfo("Asked to cancel job group " + groupId)
- eventQueue.put(JobGroupCancelled(groupId))
+ eventProcessActor ! JobGroupCancelled(groupId)
}
/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs() {
- eventQueue.put(AllJobsCancelled)
+ eventProcessActor ! AllJobsCancelled
}
/**
@@ -474,42 +486,6 @@ class DAGScheduler(
}
}
-
- /**
- * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
- * events and responds by launching tasks. This runs in a dedicated thread and receives events
- * via the eventQueue.
- */
- private def run() {
- SparkEnv.set(env)
-
- while (true) {
- val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
- if (event != null) {
- logDebug("Got event of type " + event.getClass.getName)
- }
- this.synchronized { // needed in case other threads makes calls into methods of this class
- if (event != null) {
- if (processEvent(event)) {
- return
- }
- }
-
- val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
- // Periodically resubmit failed stages if some map output fetches have failed and we have
- // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
- // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
- // the same time, so we want to make sure we've identified all the reduce tasks that depend
- // on the failed node.
- if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
- resubmitFailedStages()
- } else {
- submitWaitingStages()
- }
- }
- }
- }
-
/**
* Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
@@ -878,7 +854,7 @@ class DAGScheduler(
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
- rdd.dependencies.foreach(_ match {
+ rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
@@ -886,7 +862,7 @@ class DAGScheduler(
return locs
}
case _ =>
- })
+ }
Nil
}
@@ -909,7 +885,7 @@ class DAGScheduler(
}
def stop() {
- eventQueue.put(StopDAGScheduler)
+ eventProcessActor ! StopDAGScheduler
metadataCleaner.cancel()
taskSched.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index cec02e945c..1caa88e61f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,10 +33,6 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val uiFilePath = driverFilePath + "_ui"
- val tmpUiPath = new Path(uiFilePath + "_tmp")
- val uiPath = new Path(uiFilePath)
-
val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
override def start() {
@@ -51,23 +47,17 @@ private[spark] class SimrSchedulerBackend(
logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
- logInfo("Writing to HDFS file: " + uiFilePath)
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
+ temp.writeUTF(sc.ui.appUIAddress)
temp.close()
// "Atomic" rename
fs.rename(tmpPath, filePath)
-
- // Write Spark UI Address to file
- val uiTemp = fs.create(tmpUiPath, true)
- uiTemp.writeUTF(sc.ui.appUIAddress)
- uiTemp.close()
- fs.rename(tmpUiPath, uiPath)
}
override def stop() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 226ea46cc7..19ee87ecc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -182,6 +182,7 @@ private[spark] class CoarseMesosSchedulerBackend(
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a34c95b6f0..702aca8323 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -891,9 +891,9 @@ private[spark] object BlockManager extends Logging {
blockManagerMaster: BlockManagerMaster = null)
: Map[BlockId, Seq[BlockManagerId]] =
{
- // env == null and blockManagerMaster != null is used in tests
+ // blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
- val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
+ val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
env.blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)