From 2539c0674501432fb62073577db6da52a26db850 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Sat, 9 Nov 2013 19:05:18 +0800 Subject: Replaced the daemon thread started by DAGScheduler with an actor --- .../main/scala/org/apache/spark/SparkContext.scala | 1 - .../org/apache/spark/scheduler/DAGScheduler.scala | 105 +++++++++------------ .../org/apache/spark/storage/BlockManager.scala | 2 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 4 files changed, 45 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 880b49e8ef..03ffcc6f9c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -238,7 +238,6 @@ class SparkContext( taskScheduler.start() @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) - dagScheduler.start() ui.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d0b21e896e..cb19969369 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -19,9 +19,9 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import akka.actor.{Props, Actor, ActorRef} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import org.apache.spark._ @@ -65,12 +65,12 @@ class DAGScheduler( // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { - eventQueue.put(BeginEvent(task, taskInfo)) + eventProcessActor ! BeginEvent(task, taskInfo) } // Called to report that a task has completed and results are being fetched remotely. def taskGettingResult(task: Task[_], taskInfo: TaskInfo) { - eventQueue.put(GettingResultEvent(task, taskInfo)) + eventProcessActor ! GettingResultEvent(task, taskInfo) } // Called by TaskScheduler to report task completions or failures. @@ -81,23 +81,23 @@ class DAGScheduler( accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics) { - eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)) + eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) } // Called by TaskScheduler when an executor fails. def executorLost(execId: String) { - eventQueue.put(ExecutorLost(execId)) + eventProcessActor ! ExecutorLost(execId) } // Called by TaskScheduler when a host is added def executorGained(execId: String, host: String) { - eventQueue.put(ExecutorGained(execId, host)) + eventProcessActor ! ExecutorGained(execId, host) } // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or // cancellation of the job itself. def taskSetFailed(taskSet: TaskSet, reason: String) { - eventQueue.put(TaskSetFailed(taskSet, reason)) + eventProcessActor ! TaskSetFailed(taskSet, reason) } // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; @@ -109,7 +109,36 @@ class DAGScheduler( // resubmit failed stages val POLL_TIMEOUT = 10L - private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] + private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { + /** + * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure + * events and responds by launching tasks. This runs in a dedicated thread and receives events + * via the eventQueue. + */ + def receive = { + case event: DAGSchedulerEvent => + if (event != null) { + logDebug("Got event of type " + event.getClass.getName) + } + + if (!processEvent(event)) { + val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability + // Periodically resubmit failed stages if some map output fetches have failed and we have + // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, + // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at + // the same time, so we want to make sure we've identified all the reduce tasks that depend + // on the failed node. + if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { + resubmitFailedStages() + } else { + submitWaitingStages() + } + } + else { + context.stop(self) + } + } + })) private[scheduler] val nextJobId = new AtomicInteger(0) @@ -150,16 +179,6 @@ class DAGScheduler( val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup) - // Start a thread to run the DAGScheduler event loop - def start() { - new Thread("DAGScheduler") { - setDaemon(true) - override def run() { - DAGScheduler.this.run() - } - }.start() - } - def addSparkListener(listener: SparkListener) { listenerBus.addListener(listener) } @@ -301,8 +320,7 @@ class DAGScheduler( assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) - eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, - waiter, properties)) + eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) waiter } @@ -337,8 +355,7 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() - eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, - listener, properties)) + eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) listener.awaitResult() // Will throw an exception if the job fails } @@ -347,19 +364,19 @@ class DAGScheduler( */ def cancelJob(jobId: Int) { logInfo("Asked to cancel job " + jobId) - eventQueue.put(JobCancelled(jobId)) + eventProcessActor ! JobCancelled(jobId) } def cancelJobGroup(groupId: String) { logInfo("Asked to cancel job group " + groupId) - eventQueue.put(JobGroupCancelled(groupId)) + eventProcessActor ! JobGroupCancelled(groupId) } /** * Cancel all jobs that are running or waiting in the queue. */ def cancelAllJobs() { - eventQueue.put(AllJobsCancelled) + eventProcessActor ! AllJobsCancelled } /** @@ -474,42 +491,6 @@ class DAGScheduler( } } - - /** - * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure - * events and responds by launching tasks. This runs in a dedicated thread and receives events - * via the eventQueue. - */ - private def run() { - SparkEnv.set(env) - - while (true) { - val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS) - if (event != null) { - logDebug("Got event of type " + event.getClass.getName) - } - this.synchronized { // needed in case other threads makes calls into methods of this class - if (event != null) { - if (processEvent(event)) { - return - } - } - - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability - // Periodically resubmit failed stages if some map output fetches have failed and we have - // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, - // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at - // the same time, so we want to make sure we've identified all the reduce tasks that depend - // on the failed node. - if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages() - } else { - submitWaitingStages() - } - } - } - } - /** * Run a job on an RDD locally, assuming it has only a single partition and no dependencies. * We run the operation in a separate thread just in case it takes a bunch of time, so that we @@ -909,7 +890,7 @@ class DAGScheduler( } def stop() { - eventQueue.put(StopDAGScheduler) + eventProcessActor ! StopDAGScheduler metadataCleaner.cancel() taskSched.stop() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a34c95b6f0..2c21134393 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -893,7 +893,7 @@ private[spark] object BlockManager extends Logging { { // env == null and blockManagerMaster != null is used in tests assert (env != null || blockManagerMaster != null) - val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) { + val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { env.blockManager.getLocationBlockIds(blockIds) } else { blockManagerMaster.getLocations(blockIds) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 00f2fdd657..a4d41ebbff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -100,7 +100,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont cacheLocations.clear() results.clear() mapOutputTracker = new MapOutputTrackerMaster() - scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) { + scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) { override def runLocally(job: ActiveJob) { // don't bother with the thread while unit testing runLocallyWithinThread(job) -- cgit v1.2.3 From 765ebca04f3dce1685c64022425bd281993be90e Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Sat, 9 Nov 2013 21:13:03 +0800 Subject: Remove unnecessary null checking --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb19969369..a73a6e19f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -117,9 +117,7 @@ class DAGScheduler( */ def receive = { case event: DAGSchedulerEvent => - if (event != null) { - logDebug("Got event of type " + event.getClass.getName) - } + logDebug("Got event of type " + event.getClass.getName) if (!processEvent(event)) { val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability -- cgit v1.2.3 From ba552851771cf8eaf90b72b661c3df60080d0ef9 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 11 Nov 2013 01:25:35 +0800 Subject: Put the periodical resubmitFailedStages() call into a scheduled task --- .../org/apache/spark/scheduler/DAGScheduler.scala | 28 ++++++++++------------ 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a73a6e19f4..74995706a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,8 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{Props, Actor, ActorRef} +import akka.actor._ +import akka.util.duration._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import org.apache.spark._ @@ -110,6 +111,13 @@ class DAGScheduler( val POLL_TIMEOUT = 10L private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { + override def preStart() { + env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { + if (failed.size > 0) + resubmitFailedStages() + } + } + /** * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure * events and responds by launching tasks. This runs in a dedicated thread and receives events @@ -119,22 +127,10 @@ class DAGScheduler( case event: DAGSchedulerEvent => logDebug("Got event of type " + event.getClass.getName) - if (!processEvent(event)) { - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability - // Periodically resubmit failed stages if some map output fetches have failed and we have - // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, - // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at - // the same time, so we want to make sure we've identified all the reduce tasks that depend - // on the failed node. - if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages() - } else { - submitWaitingStages() - } - } - else { + if (!processEvent(event)) + submitWaitingStages() + else context.stop(self) - } } })) -- cgit v1.2.3 From e2a43b3dcce81fc99098510d09095e1be4bf3e29 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 11 Nov 2013 12:21:54 +0800 Subject: Made some changes according to suggestions from @aarondav --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++---- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 74995706a8..42bb3884c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -112,9 +112,10 @@ class DAGScheduler( private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { override def preStart() { - env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { - if (failed.size > 0) + context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { + if (failed.size > 0) { resubmitFailedStages() + } } } @@ -853,7 +854,7 @@ class DAGScheduler( // If the RDD has narrow dependencies, pick the first partition of the first narrow dep // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. - rdd.dependencies.foreach(_ match { + rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocs(n.rdd, inPart) @@ -861,7 +862,7 @@ class DAGScheduler( return locs } case _ => - }) + } Nil } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2c21134393..702aca8323 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -891,7 +891,7 @@ private[spark] object BlockManager extends Logging { blockManagerMaster: BlockManagerMaster = null) : Map[BlockId, Seq[BlockManagerId]] = { - // env == null and blockManagerMaster != null is used in tests + // blockManagerMaster != null is used in tests assert (env != null || blockManagerMaster != null) val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { env.blockManager.getLocationBlockIds(blockIds) -- cgit v1.2.3 From 23146a67052889797d6761388cbc19ae6bfe6e21 Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Tue, 12 Nov 2013 13:17:48 -0800 Subject: spark-assembly.jar fails to authenticate with YARN ResourceManager sbt-assembly is setup to pick the first META-INF/services/org.apache.hadoop.security.SecurityInfo file instead of merging them. This causes Kerberos authentication to fail, this manifests itself in the "info:null" debug log statement: DEBUG SaslRpcClient: Get token info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:null DEBUG SaslRpcClient: Get kerberos info proto:interface org.apache.hadoop.yarn.api.ApplicationClientProtocolPB info:null ERROR UserGroupInformation: PriviledgedActionException as:foo@BAR (auth:KERBEROS) cause:org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] DEBUG UserGroupInformation: PrivilegedAction as:foo@BAR (auth:KERBEROS) from:org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:583) WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] ERROR UserGroupInformation: PriviledgedActionException as:foo@BAR (auth:KERBEROS) cause:java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] This previously would just contain a single class: $ unzip -c assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar META-INF/services/org.apache.hadoop.security.SecurityInfo Archive: assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar inflating: META-INF/services/org.apache.hadoop.security.SecurityInfo org.apache.hadoop.security.AnnotatedSecurityInfo And now has the full list of classes: $ unzip -c assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar META-INF/services/org.apache.hadoop.security.SecurityInfoArchive: assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop2.2.0.jar inflating: META-INF/services/org.apache.hadoop.security.SecurityInfo org.apache.hadoop.security.AnnotatedSecurityInfo org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo org.apache.hadoop.yarn.security.SchedulerSecurityInfo org.apache.hadoop.yarn.security.admin.AdminSecurityInfo org.apache.hadoop.yarn.server.RMNMSecurityInfoClass --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 115570dbe2..2db167fe04 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -327,7 +327,7 @@ object SparkBuild extends Build { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "log4j.properties" => MergeStrategy.discard - case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat + case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } -- cgit v1.2.3 From 48eac0bcbf8ab80237af5f50abd4f7734a4837eb Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Tue, 12 Nov 2013 13:18:35 -0800 Subject: Upgrade to sbt-assembly 0.9.2 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index cfcd85082a..4ba0e4280a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,7 +4,7 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release resolvers += "Spray Repository" at "http://repo.spray.cc/" -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") -- cgit v1.2.3 From 0ea1f8b225031d5d2e44af4147ab4c8cfff4febc Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Wed, 13 Nov 2013 15:23:36 -0800 Subject: Write Spark UI url to driver file on HDFS --- .../spark/scheduler/cluster/SimrSchedulerBackend.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 6b91935400..0ea35e2b7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -31,10 +31,6 @@ private[spark] class SimrSchedulerBackend( val tmpPath = new Path(driverFilePath + "_tmp") val filePath = new Path(driverFilePath) - val uiFilePath = driverFilePath + "_ui" - val tmpUiPath = new Path(uiFilePath + "_tmp") - val uiPath = new Path(uiFilePath) - val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt override def start() { @@ -49,23 +45,17 @@ private[spark] class SimrSchedulerBackend( logInfo("Writing to HDFS file: " + driverFilePath) logInfo("Writing Akka address: " + driverUrl) - logInfo("Writing to HDFS file: " + uiFilePath) logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) temp.writeUTF(driverUrl) temp.writeInt(maxCores) + temp.writeUTF(sc.ui.appUIAddress) temp.close() // "Atomic" rename fs.rename(tmpPath, filePath) - - // Write Spark UI Address to file - val uiTemp = fs.create(tmpUiPath, true) - uiTemp.writeUTF(sc.ui.appUIAddress) - uiTemp.close() - fs.rename(tmpUiPath, uiPath) } override def stop() { -- cgit v1.2.3 From 5125cd34663b83edceaa40deaf5f7f48a12138e5 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 13 Nov 2013 23:06:17 -0800 Subject: Don't ignore spark.cores.max when using Mesos Coarse mode --- .../spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 300fe693f1..cd521e0f2b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend( !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId -- cgit v1.2.3 From cc8995c8f4bf2a447199c5ff7796bebd4599ce51 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Thu, 14 Nov 2013 18:17:05 +0800 Subject: Fixed a scaladoc typo in HadoopRDD.scala --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 47e958b5e6..53f77a38f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * sources in HBase, or S3). * * @param sc The SparkContext to associate the RDD with. - * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD -- cgit v1.2.3 From bef398e572c7d4ee5a0e1e7c997e0adebc6e030a Mon Sep 17 00:00:00 2001 From: RIA-pierre-borckmans Date: Thu, 14 Nov 2013 11:33:48 +0100 Subject: Fixed typos in the CDH4 distributions version codes. --- docs/hadoop-third-party-distributions.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md index f706625fe9..b33af2cf24 100644 --- a/docs/hadoop-third-party-distributions.md +++ b/docs/hadoop-third-party-distributions.md @@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.

CDH Releases

- - + + -- cgit v1.2.3 From b4546ba9e694529c359b7ca5c26829ead2c07f1a Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 14 Nov 2013 13:33:11 -0800 Subject: Fix bug where scheduler could hang after task failure. When a task fails, we need to call reviveOffers() so that the task can be rescheduled on a different machine. In the current code, the state in ClusterTaskSetManager indicating which tasks are pending may be updated after revive offers is called (there's a race condition here), so when revive offers is called, the task set manager does not yet realize that there are failed tasks that need to be relaunched. --- .../apache/spark/scheduler/cluster/ClusterScheduler.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 53a589615d..c1e65a3c48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None - var taskFailed = false synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToExecutorId.remove(tid) } - if (state == TaskState.FAILED) { - taskFailed = true - } activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } - if (taskFailed) { - // Also revive offers if a task had failed for some reason other than host lost - backend.reviveOffers() - } } def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { @@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskState: TaskState, reason: Option[TaskEndReason]) = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (taskState == TaskState.FINISHED) { - // The task finished successfully but the result was lost, so we should revive offers. + if (taskState != TaskState.KILLED) { + // Need to revive offers again now that the task set manager state has been updated to + // reflect failed tasks that need to be re-run. backend.reviveOffers() } } -- cgit v1.2.3 From 29c88e408ecc3416104530756561fee482393913 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 14 Nov 2013 15:15:19 -0800 Subject: Don't retry tasks when they fail due to a NotSerializableException --- .../apache/spark/scheduler/cluster/ClusterTaskSetManager.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index ee47aaffca..4c5eca8537 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster +import java.io.NotSerializableException import java.util.Arrays import scala.collection.mutable.ArrayBuffer @@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager( case ef: ExceptionFailure => sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + if (ef.className == classOf[NotSerializableException].getName()) { + // If the task result wasn't serializable, there's no point in trying to re-execute it. + logError("Task %s:%s had a not serializable result: %s; not retrying".format( + taskSet.id, index, ef.description)) + abort("Task %s:%s had a not serializable result: %s".format( + taskSet.id, index, ef.description)) + return + } val key = ef.description val now = clock.getTime() val (printFull, dupCount) = { -- cgit v1.2.3
ReleaseVersion code
CDH 4.X.X (YARN mode)2.0.0-chd4.X.X
CDH 4.X.X2.0.0-mr1-chd4.X.X
CDH 4.X.X (YARN mode)2.0.0-cdh4.X.X
CDH 4.X.X2.0.0-mr1-cdh4.X.X
CDH 3u60.20.2-cdh3u6
CDH 3u50.20.2-cdh3u5
CDH 3u40.20.2-cdh3u4