From 418e36caa8fcd9a70026ab762ec709732fdebd6b Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Thu, 31 Jan 2013 17:18:33 -0600 Subject: Add more private declarations. --- core/src/main/scala/spark/MapOutputTracker.scala | 2 +- .../scala/spark/deploy/master/MasterWebUI.scala | 22 ++++------- .../main/scala/spark/scheduler/DAGScheduler.scala | 46 +++++++++++----------- .../scala/spark/scheduler/ShuffleMapTask.scala | 3 +- .../spark/scheduler/cluster/ClusterScheduler.scala | 2 +- .../spark/scheduler/cluster/TaskSetManager.scala | 19 ++++----- .../spark/scheduler/local/LocalScheduler.scala | 4 +- .../main/scala/spark/util/MetadataCleaner.scala | 10 ++--- 8 files changed, 49 insertions(+), 59 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index aaf433b324..4735207585 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -170,7 +170,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolea } } - def cleanup(cleanupTime: Long) { + private def cleanup(cleanupTime: Long) { mapStatuses.clearOldValues(cleanupTime) cachedSerializedStatuses.clearOldValues(cleanupTime) } diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index a01774f511..529f72e9da 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -45,13 +45,9 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) => val future = master ? RequestMasterState val jobInfo = for (masterState <- future.mapTo[MasterState]) yield { - masterState.activeJobs.find(_.id == jobId) match { - case Some(job) => job - case _ => masterState.completedJobs.find(_.id == jobId) match { - case Some(job) => job - case _ => null - } - } + masterState.activeJobs.find(_.id == jobId).getOrElse({ + masterState.completedJobs.find(_.id == jobId).getOrElse(null) + }) } respondWithMediaType(MediaTypes.`application/json`) { ctx => ctx.complete(jobInfo.mapTo[JobInfo]) @@ -61,14 +57,10 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct val future = master ? RequestMasterState future.map { state => val masterState = state.asInstanceOf[MasterState] - - masterState.activeJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => masterState.completedJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => null - } - } + val job = masterState.activeJobs.find(_.id == jobId).getOrElse({ + masterState.completedJobs.find(_.id == jobId).getOrElse(null) + }) + spark.deploy.master.html.job_details.render(job) } } } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b130be6a38..14f61f7e87 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -97,7 +97,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } }.start() - def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { + private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.splits.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray cacheLocs(rdd.id) = blockManagerMaster.getLocations(blockIds).map { @@ -107,7 +107,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with cacheLocs(rdd.id) } - def clearCacheLocs() { + private def clearCacheLocs() { cacheLocs.clear() } @@ -116,7 +116,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * The priority value passed in will be used if the stage doesn't already exist with * a lower priority (we assume that priorities always increase across jobs for now). */ - def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { + private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => @@ -131,11 +131,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * as a result stage for the final RDD used directly in an action. The stage will also be given * the provided priority. */ - def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { - // Kind of ugly: need to register RDDs with the cache and map output tracker here - // since we can't do it in the RDD constructor because # of splits is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { if (shuffleDep != None) { + // Kind of ugly: need to register RDDs with the cache and map output tracker here + // since we can't do it in the RDD constructor because # of splits is unknown + logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size) } val id = nextStageId.getAndIncrement() @@ -148,7 +148,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * Get or create the list of parent stages for a given RDD. The stages will be assigned the * provided priority if they haven't already been created with a lower priority. */ - def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = { + private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { @@ -170,7 +170,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with parents.toList } - def getMissingParentStages(stage: Stage): List[Stage] = { + private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(rdd: RDD[_]) { @@ -241,7 +241,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * events and responds by launching tasks. This runs in a dedicated thread and receives events * via the eventQueue. */ - def run() { + private def run() { SparkEnv.set(env) while (true) { @@ -326,7 +326,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * We run the operation in a separate thread just in case it takes a bunch of time, so that we * don't block the DAGScheduler event loop or other concurrent jobs. */ - def runLocally(job: ActiveJob) { + private def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") new Thread("Local computation of job " + job.runId) { override def run() { @@ -349,13 +349,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with }.start() } - def submitStage(stage: Stage) { + /** Submits stage, but first recursively submits any missing parents. */ + private def submitStage(stage: Stage) { logDebug("submitStage(" + stage + ")") if (!waiting(stage) && !running(stage) && !failed(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { - logInfo("Submitting " + stage + " (" + stage.origin + "), which has no missing parents") + logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage) running += stage } else { @@ -367,7 +368,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } } - def submitMissingTasks(stage: Stage) { + /** Called when stage's parents are available and we can now do its task. */ + private def submitMissingTasks(stage: Stage) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) @@ -388,7 +390,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } } if (tasks.size > 0) { - logInfo("Submitting " + tasks.size + " missing tasks from " + stage) + logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) taskSched.submitTasks( @@ -407,7 +409,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */ - def handleTaskCompletion(event: CompletionEvent) { + private def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) @@ -492,7 +494,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with waiting --= newlyRunnable running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { - logInfo("Submitting " + stage + " (" + stage.origin + "), which is now runnable") + logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") submitMissingTasks(stage) } } @@ -541,7 +543,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * Optionally the generation during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) { + private def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) { val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration) if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) { failedGeneration(execId) = currentGeneration @@ -567,7 +569,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * Aborts all jobs depending on a particular Stage. This is called in response to a task set * being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. */ - def abortStage(failedStage: Stage, reason: String) { + private def abortStage(failedStage: Stage, reason: String) { val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) @@ -583,7 +585,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with /** * Return true if one of stage's ancestors is target. */ - def stageDependsOn(stage: Stage, target: Stage): Boolean = { + private def stageDependsOn(stage: Stage, target: Stage): Boolean = { if (stage == target) { return true } @@ -610,7 +612,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with visitedRdds.contains(target.rdd) } - def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + private def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { // If the partition is cached, return the cache locations val cached = getCacheLocs(rdd)(partition) if (cached != Nil) { @@ -636,7 +638,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with return Nil } - def cleanup(cleanupTime: Long) { + private def cleanup(cleanupTime: Long) { var sizeBefore = idToStage.size idToStage.clearOldValues(cleanupTime) logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 83641a2a84..b701b67c89 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -127,7 +127,6 @@ private[spark] class ShuffleMapTask( val bucketId = dep.partitioner.getPartition(pair._1) buckets(bucketId) += pair } - val bucketIterators = buckets.map(_.iterator) val compressedSizes = new Array[Byte](numOutputSplits) @@ -135,7 +134,7 @@ private[spark] class ShuffleMapTask( for (i <- 0 until numOutputSplits) { val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i // Get a Scala iterator from Java map - val iter: Iterator[(Any, Any)] = bucketIterators(i) + val iter: Iterator[(Any, Any)] = buckets(i).iterator val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) compressedSizes(i) = MapOutputTracker.compressSize(size) } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 0b4177805b..1e4fbdb874 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -86,7 +86,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def submitTasks(taskSet: TaskSet) { + override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 26201ad0dd..3dabdd76b1 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -17,10 +17,7 @@ import java.nio.ByteBuffer /** * Schedules the tasks within a single TaskSet in the ClusterScheduler. */ -private[spark] class TaskSetManager( - sched: ClusterScheduler, - val taskSet: TaskSet) - extends Logging { +private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Logging { // Maximum time to wait to run a task in a preferred location (in ms) val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong @@ -100,7 +97,7 @@ private[spark] class TaskSetManager( } // Add a task to all the pending-task lists that it should be on. - def addPendingTask(index: Int) { + private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive if (locations.size == 0) { pendingTasksWithNoPrefs += index @@ -115,7 +112,7 @@ private[spark] class TaskSetManager( // Return the pending tasks list for a given host, or an empty list if // there is no map entry for that host - def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { + private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { pendingTasksForHost.getOrElse(host, ArrayBuffer()) } @@ -123,7 +120,7 @@ private[spark] class TaskSetManager( // Return None if the list is empty. // This method also cleans up any tasks in the list that have already // been launched, since we want that to happen lazily. - def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { + private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { while (!list.isEmpty) { val index = list.last list.trimEnd(1) @@ -137,7 +134,7 @@ private[spark] class TaskSetManager( // Return a speculative task for a given host if any are available. The task should not have an // attempt running on this host, in case the host is slow. In addition, if localOnly is set, the // task must have a preference for this host (or no preferred locations at all). - def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = { + private def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = { val hostsAlive = sched.hostsAlive speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set val localTask = speculatableTasks.find { @@ -162,7 +159,7 @@ private[spark] class TaskSetManager( // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. - def findTask(host: String, localOnly: Boolean): Option[Int] = { + private def findTask(host: String, localOnly: Boolean): Option[Int] = { val localTask = findTaskFromList(getPendingTasksForHost(host)) if (localTask != None) { return localTask @@ -184,7 +181,7 @@ private[spark] class TaskSetManager( // Does a host count as a preferred location for a task? This is true if // either the task has preferred locations and this host is one, or it has // no preferred locations (in which we still count the launch as preferred). - def isPreferredLocation(task: Task[_], host: String): Boolean = { + private def isPreferredLocation(task: Task[_], host: String): Boolean = { val locs = task.preferredLocations return (locs.contains(host) || locs.isEmpty) } @@ -335,7 +332,7 @@ private[spark] class TaskSetManager( if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %s:%d failed more than %d times; aborting job".format( taskSet.id, index, MAX_TASK_FAILURES)) - abort("Task %d failed more than %d times".format(index, MAX_TASK_FAILURES)) + abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES)) } } } else { diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 9ff7c02097..482d1cc853 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -53,7 +53,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } def runTask(task: Task[_], idInJob: Int, attemptId: Int) { - logInfo("Running task " + idInJob) + logInfo("Running " + task) // Set the Spark execution environment for the worker thread SparkEnv.set(env) try { @@ -80,7 +80,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon val resultToReturn = ser.deserialize[Any](ser.serialize(result)) val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( ser.serialize(Accumulators.values)) - logInfo("Finished task " + idInJob) + logInfo("Finished " + task) // If the threadpool has not already been shutdown, notify DAGScheduler if (!Thread.currentThread().isInterrupted) diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala index eaff7ae581..a342d378ff 100644 --- a/core/src/main/scala/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -9,12 +9,12 @@ import spark.Logging * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) */ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { - val delaySeconds = MetadataCleaner.getDelaySeconds - val periodSeconds = math.max(10, delaySeconds / 10) - val timer = new Timer(name + " cleanup timer", true) + private val delaySeconds = MetadataCleaner.getDelaySeconds + private val periodSeconds = math.max(10, delaySeconds / 10) + private val timer = new Timer(name + " cleanup timer", true) - val task = new TimerTask { - def run() { + private val task = new TimerTask { + override def run() { try { cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) logInfo("Ran metadata cleaner for " + name) -- cgit v1.2.3