diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-21 21:08:13 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 01:19:01 -0800 |
commit | 6a4acc4c2d5c510cc76049dd8727cec76a2173e8 (patch) | |
tree | 17e6582913df202ce24928bdd5ea1b4bf09c4315 /core/src/main/scala/org | |
parent | 1070b566d4cde4e9a69ccd318747b218f5a44dc7 (diff) | |
download | spark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.tar.gz spark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.tar.bz2 spark-6a4acc4c2d5c510cc76049dd8727cec76a2173e8.zip |
Initial cut at driver submission.
Diffstat (limited to 'core/src/main/scala/org')
16 files changed, 781 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 275331724a..67435261e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -20,10 +20,11 @@ package org.apache.spark.deploy import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState -import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} +import org.apache.spark.deploy.master.{DriverInfo, WorkerInfo, ApplicationInfo} import org.apache.spark.deploy.master.RecoveryState.MasterState -import org.apache.spark.deploy.worker.ExecutorRunner +import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.util.Utils +import org.apache.spark.deploy.master.DriverState.DriverState private[deploy] sealed trait DeployMessage extends Serializable @@ -54,7 +55,14 @@ private[deploy] object DeployMessages { exitStatus: Option[Int]) extends DeployMessage - case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription]) + case class DriverStateChanged( + driverId: String, + state: DriverState, + exception: Option[Exception]) + extends DeployMessage + + case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], + driverIds: Seq[String]) case class Heartbeat(workerId: String) extends DeployMessage @@ -76,14 +84,19 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - // Client to Master + case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int) + extends DeployMessage + + case class KillDriver(driverId: String) extends DeployMessage + + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) extends DeployMessage case class MasterChangeAcknowledged(appId: String) - // Master to Client + // Master to AppClient case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage @@ -97,11 +110,21 @@ private[deploy] object DeployMessages { case class ApplicationRemoved(message: String) - // Internal message in Client + // DriverClient <-> Master + + case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage + + case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage + + case class RequestKillDriver(driverId: String) extends DeployMessage + + case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage + + // Internal message in AppClient - case object StopClient + case object StopAppClient - // Master to Worker & Client + // Master to Worker & AppClient case class MasterChanged(masterUrl: String, masterWebUiUrl: String) @@ -112,6 +135,7 @@ private[deploy] object DeployMessages { // Master to MasterWebUI case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], + activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], status: MasterState) { @@ -128,7 +152,8 @@ private[deploy] object DeployMessages { // Worker to WorkerWebUI case class WorkerStateResponse(host: String, port: Int, workerId: String, - executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, + executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], + drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala new file mode 100644 index 0000000000..52f6b1b554 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +private[spark] class DriverDescription( + val jarUrl: String, + val mainClass: String, + val mem: Integer) // TODO: Should this be Long? + extends Serializable { + + override def toString: String = s"DriverDescription ($mainClass)" +} diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index c5a0d1fd01..737ba09117 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -147,7 +147,7 @@ private[spark] class AppClient( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() - case StopClient => + case StopAppClient => markDead() sender ! true context.stop(self) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala new file mode 100644 index 0000000000..482bafd0e0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.client + +import akka.actor._ +import akka.remote.{RemotingLifecycleEvent} + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.deploy.{DeployMessage, DriverDescription} +import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.master.{MasterArguments, Master} +import akka.pattern.ask + +import org.apache.spark.util.{Utils, AkkaUtils} +import scala.concurrent.duration.{FiniteDuration, Duration} +import java.util.concurrent.TimeUnit +import akka.util.Timeout +import scala.concurrent.Await +import akka.actor.Actor.emptyBehavior + +/** + * Parent class for actors that to send a single message to the standalone master and then die. + */ +private[spark] abstract class SingleMessageClient( + actorSystem: ActorSystem, master: String, message: DeployMessage) + extends Logging { + + // Concrete child classes must implement + def handleResponse(response: Any) + + var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor())) + + class DriverActor extends Actor with Logging { + override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + logInfo("Sending message to master " + master + "...") + val masterActor = context.actorSelection(Master.toAkkaUrl(master)) + val timeoutDuration: FiniteDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) + val submitFuture = masterActor.ask(message)(timeoutDuration) + handleResponse(Await.result(submitFuture, timeoutDuration)) + actorSystem.stop(actor) + actorSystem.shutdown() + } + + override def receive = emptyBehavior + } +} + +/** + * Submits a driver to the master. + */ +private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String, + driverDescription: DriverDescription) + extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) { + + override def handleResponse(response: Any) { + val resp = response.asInstanceOf[SubmitDriverResponse] + if (!resp.success) { + logError(s"Error submitting driver to $master") + logError(resp.message) + } + } +} + +/** + * Terminates a client at the master. + */ +private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String) + extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) { + + override def handleResponse(response: Any) { + val resp = response.asInstanceOf[KillDriverResponse] + if (!resp.success) { + logError(s"Error terminating $driverId at $master") + logError(resp.message) + } + } +} + +/** + * Callable utility for starting and terminating drivers inside of the standalone scheduler. + */ +object DriverClient { + + def main(args: Array[String]) { + if (args.size < 3) { + println("usage: DriverClient launch <active-master> <jar-url> <main-class>") + println("usage: DriverClient kill <active-master> <driver-id>") + System.exit(-1) + } + + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "driverSubmission", Utils.localHostName(), 0) + + // TODO Should be configurable + val mem = 512 + + args(0) match { + case "launch" => + val master = args(1) + val jarUrl = args(2) + val mainClass = args(3) + val driverDescription = new DriverDescription(jarUrl, mainClass, mem) + val client = new SubmissionClient(actorSystem, master, driverDescription) + + case "kill" => + val master = args(1) + val driverId = args(2) + val client = new TerminationClient(actorSystem, master, driverId) + } + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala new file mode 100644 index 0000000000..69d150aea5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.deploy.{DriverDescription, ApplicationDescription} +import java.util.Date +import akka.actor.ActorRef +import scala.collection.mutable + +private[spark] class DriverInfo( + val startTime: Long, + val id: String, + val desc: DriverDescription, + val submitDate: Date) + extends Serializable { + + @transient var state: DriverState.Value = DriverState.SUBMITTED + /* If we fail when launching the driver, the exception is stored here. */ + @transient var exception: Option[Exception] = None + /* Most recent worker assigned to this driver */ + @transient var worker: Option[WorkerInfo] = None + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala new file mode 100644 index 0000000000..230dab1a19 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +private[spark] object DriverState extends Enumeration { + + type DriverState = Value + + // SUBMITTED: Submitted but not yet scheduled on a worker + // RUNNING: Has been allocated to a worker to run + // FINISHED: Previously ran and exited cleanly + // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again + // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery + // KILLED: A user manually killed this driver + // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file) + val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value + + val MAX_NUM_RETRY = 10 +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 043945a211..44a046b4a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -47,6 +47,15 @@ private[spark] class FileSystemPersistenceEngine( new File(dir + File.separator + "app_" + app.id).delete() } + override def addDriver(driver: DriverInfo) { + val driverFile = new File(dir + File.separator + "driver_" + driver.id) + serializeIntoFile(driverFile, driver) + } + + override def removeDriver(driver: DriverInfo) { + new File(dir + File.separator + "driver_" + driver.id).delete() + } + override def addWorker(worker: WorkerInfo) { val workerFile = new File(dir + File.separator + "worker_" + worker.id) serializeIntoFile(workerFile, worker) @@ -56,13 +65,15 @@ private[spark] class FileSystemPersistenceEngine( new File(dir + File.separator + "worker_" + worker.id).delete() } - override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { val sortedFiles = new File(dir).listFiles().sortBy(_.getName) val appFiles = sortedFiles.filter(_.getName.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_")) + val drivers = driverFiles.map(deserializeFromFile[DriverInfo]) val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_")) val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) - (apps, workers) + (apps, drivers, workers) } private def serializeIntoFile(file: File, value: AnyRef) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index eebd0794b8..76af332986 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} +import org.apache.spark.deploy.{DriverDescription, ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI @@ -47,7 +47,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "") val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE") - var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo] val actorToWorker = new HashMap[ActorRef, WorkerInfo] @@ -57,9 +56,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val idToApp = new HashMap[String, ApplicationInfo] val actorToApp = new HashMap[ActorRef, ApplicationInfo] val addressToApp = new HashMap[Address, ApplicationInfo] - val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] + var nextAppNumber = 0 + + val drivers = new HashSet[DriverInfo] + val completedDrivers = new ArrayBuffer[DriverInfo] + val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling + var nextDriverNumber = 0 Utils.checkHost(host, "Expected hostname") @@ -134,14 +138,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act override def receive = { case ElectedLeader => { - val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() - state = if (storedApps.isEmpty && storedWorkers.isEmpty) + val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) RecoveryState.ALIVE else RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { - beginRecovery(storedApps, storedWorkers) + beginRecovery(storedApps, storedDrivers, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } } @@ -168,6 +172,52 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + case RequestSubmitDriver(description) => { + if (state == RecoveryState.STANDBY) { + sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission") + } else { + logInfo("Driver submitted " + description.mainClass) + val driver = createDriver(description) + persistenceEngine.addDriver(driver) + waitingDrivers += driver + drivers.add(driver) + schedule() + + // TODO: It might be good to instead have the submission client poll the master to determine + // the current status of the driver. Since we may already want to expose this. + + sender ! SubmitDriverResponse(true, "Driver successfully submitted") + } + } + + case RequestKillDriver(driverId) => { + if (state == RecoveryState.STANDBY) { + sender ! KillDriverResponse(false, "Standby master cannot kill drivers") + } else { + logInfo("Asked to kill driver " + driverId) + val driver = drivers.find(_.id == driverId) + driver match { + case Some(d) => + if (waitingDrivers.contains(d)) { waitingDrivers -= d } + else { + // We just notify the worker to kill the driver here. The final bookkeeping occurs + // on the return path when the worker submits a state change back to the master + // to notify it that the driver was successfully killed. + d.worker.foreach { w => + w.actor ! KillDriver(driverId) + } + } + val msg = s"Kill request for $driverId submitted" + logInfo(msg) + sender ! KillDriverResponse(true, msg) + case None => + val msg = s"Could not find running driver $driverId" + logWarning(msg) + sender ! KillDriverResponse(false, msg) + } + } + } + case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response @@ -210,6 +260,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + case DriverStateChanged(driverId, state, exception) => { + if (!(state == DriverState.FAILED || state == DriverState.FINISHED || + state == DriverState.KILLED)) { + throw new Exception(s"Received unexpected state update for driver $driverId: $state") + } + drivers.find(_.id == driverId) match { + case Some(driver) => { + drivers -= driver + completedDrivers += driver + persistenceEngine.removeDriver(driver) + driver.state = state + driver.exception = exception + driver.worker.foreach(w => w.removeDriver(driver)) + } + case None => + logWarning(s"Got driver update for unknown driver $driverId") + } + } + case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => @@ -231,7 +300,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case WorkerSchedulerStateResponse(workerId, executors) => { + case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) @@ -244,6 +313,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act worker.addExecutor(execInfo) execInfo.copyState(exec) } + + for (driverId <- driverIds) { + drivers.find(_.id == driverId).foreach { driver => + driver.worker = Some(worker) + driver.state = DriverState.RUNNING + worker.drivers(driverId) = driver + } + } case None => logWarning("Scheduler state from unknown worker: " + workerId) } @@ -260,8 +337,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, - state) + sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray, + completedDrivers.toArray ,apps.toArray, completedApps.toArray, state) } case CheckForWorkerTimeOut => { @@ -277,7 +354,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.count(_.state == WorkerState.UNKNOWN) == 0 && apps.count(_.state == ApplicationState.UNKNOWN) == 0 - def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], + storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { @@ -289,6 +367,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + for (driver <- storedDrivers) { + // Here we just read in the list of drivers. Any drivers associated with now-lost workers + // will be re-launched when we detect that the worker is missing. + drivers += driver + } + for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { @@ -312,6 +396,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) + // Reschedule drivers which were not claimed by any workers + drivers.filter(_.worker.isEmpty).foreach { d => + logWarning(s"Driver ${d.id} was not found after master recovery, re-launching") + relaunchDriver(d) + } + state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") @@ -332,6 +422,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act */ def schedule() { if (state != RecoveryState.ALIVE) { return } + // First schedule drivers, they take strict precedence over applications + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { + for (driver <- Seq(waitingDrivers: _*)) { + if (worker.memoryFree > driver.desc.mem) { + launchDriver(worker, driver) + waitingDrivers -= driver + } + } + } + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { @@ -418,9 +518,19 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } + for (driver <- worker.drivers.values) { + relaunchDriver(driver) + } persistenceEngine.removeWorker(worker) } + def relaunchDriver(driver: DriverInfo) { + driver.worker = None + driver.state = DriverState.RELAUNCHING + waitingDrivers += driver + schedule() + } + def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) @@ -499,6 +609,28 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } } + + /** Generate a new driver ID given a driver's submission date */ + def newDriverId(submitDate: Date): String = { + val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) + nextDriverNumber += 1 + appId + } + + def createDriver(desc: DriverDescription): DriverInfo = { + val now = System.currentTimeMillis() + val date = new Date(now) + new DriverInfo(now, newDriverId(date), desc, date) + } + + def launchDriver(worker: WorkerInfo, driver: DriverInfo) { + logInfo("Launching driver " + driver.id + " on worker " + worker.id) + worker.addDriver(driver) + driver.worker = Some(worker) + worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass, + driver.desc.mem) + driver.state = DriverState.RUNNING + } } private[spark] object Master { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 94b986caf2..e3640ea4f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -35,11 +35,15 @@ private[spark] trait PersistenceEngine { def removeWorker(worker: WorkerInfo) + def addDriver(driver: DriverInfo) + + def removeDriver(driver: DriverInfo) + /** * Returns the persisted data sorted by their respective ids (which implies that they're * sorted by time of creation). */ - def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) + def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) def close() {} } @@ -49,5 +53,8 @@ private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} override def removeWorker(worker: WorkerInfo) {} - override def readPersistedData() = (Nil, Nil) + override def addDriver(driver: DriverInfo) {} + override def removeDriver(driver: DriverInfo) {} + + override def readPersistedData() = (Nil, Nil, Nil) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index e05f587b58..27c2ff4b8c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -36,6 +36,7 @@ private[spark] class WorkerInfo( assert (port > 0) @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info + @transient var drivers: mutable.HashMap[String, DriverInfo] = _ @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @transient var memoryUsed: Int = _ @@ -54,6 +55,7 @@ private[spark] class WorkerInfo( private def init() { executors = new mutable.HashMap + drivers = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 @@ -83,6 +85,18 @@ private[spark] class WorkerInfo( executors.values.exists(_.application == app) } + def addDriver(driver: DriverInfo) { + drivers(driver.id) = driver + memoryUsed += driver.desc.mem + coresUsed += 1 + } + + def removeDriver(driver: DriverInfo) { + drivers -= driver.id + memoryUsed -= driver.desc.mem + coresUsed -= 1 + } + def webUiAddress : String = { "http://" + this.publicAddress + ":" + this.webUiPort } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 825344b3bb..52df173850 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -49,6 +49,14 @@ class ZooKeeperPersistenceEngine(serialization: Serialization) zk.delete(WORKING_DIR + "/app_" + app.id) } + override def addDriver(driver: DriverInfo) { + serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver) + } + + override def removeDriver(driver: DriverInfo) { + zk.delete(WORKING_DIR + "/driver_" + driver.id) + } + override def addWorker(worker: WorkerInfo) { serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker) } @@ -61,13 +69,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization) zk.close() } - override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted val appFiles = sortedFiles.filter(_.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val driverFiles = sortedFiles.filter(_.startsWith("driver_")) + val drivers = driverFiles.map(deserializeFromFile[DriverInfo]) val workerFiles = sortedFiles.filter(_.startsWith("worker_")) val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) - (apps, workers) + (apps, drivers, workers) } private def serializeIntoFile(path: String, value: AnyRef) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 4ef762892c..13903b4a1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -26,7 +26,8 @@ import net.liftweb.json.JsonAST.JValue import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.master.{DriverInfo, ApplicationInfo, WorkerInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils @@ -56,6 +57,12 @@ private[spark] class IndexPage(parent: MasterWebUI) { val completedApps = state.completedApps.sortBy(_.endTime).reverse val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class") + val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse + val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) + val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse + val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) + val content = <div class="row-fluid"> <div class="span12"> @@ -70,6 +77,9 @@ private[spark] class IndexPage(parent: MasterWebUI) { <li><strong>Applications:</strong> {state.activeApps.size} Running, {state.completedApps.size} Completed </li> + <li><strong>Drivers:</strong> + {state.activeDrivers.size} Running, + {state.completedDrivers.size} Completed </li> </ul> </div> </div> @@ -94,7 +104,22 @@ private[spark] class IndexPage(parent: MasterWebUI) { <h4> Completed Applications </h4> {completedAppsTable} </div> - </div>; + </div> + + <div class="row-fluid"> + <div class="span12"> + <h4> Active Drivers </h4> + + {activeDriversTable} + </div> + </div> + + <div class="row-fluid"> + <div class="span12"> + <h4> Completed Drivers </h4> + {completedDriversTable} + </div> + </div>; UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } @@ -134,4 +159,17 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td>{DeployWebUI.formatDuration(app.duration)}</td> </tr> } + + def driverRow(driver: DriverInfo): Seq[Node] = { + <tr> + <td>{driver.id} </td> + <td>{driver.submitDate}</td> + <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td> + <td>{driver.state}</td> + <td sorttable_customkey={driver.desc.mem.toString}> + {Utils.megabytesToString(driver.desc.mem.toLong)} + </td> + <td>{driver.desc.mainClass}</td> + </tr> + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala new file mode 100644 index 0000000000..fccc36b660 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import java.io._ + +import com.google.common.base.Charsets +import com.google.common.io.Files + +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import org.apache.hadoop.fs.{FileUtil, Path} +import org.apache.hadoop.conf.Configuration +import akka.actor.{ActorRef, ActorSelection} +import org.apache.spark.deploy.DeployMessages.DriverStateChanged +import org.apache.spark.deploy.master.DriverState + +/** + * Manages the execution of one driver process. + */ +private[spark] class DriverRunner( + val driverId: String, + val jarUrl: String, + val mainClass: String, + val workDir: File, + val memory: Int, + val worker: ActorRef) + extends Logging { + + var process: Option[Process] = None + @volatile var killed = false + + /** Starts a thread to run and manage the driver. */ + def start() = { + new Thread("DriverRunner for " + driverId) { + override def run() { + var exn: Option[Exception] = None + + try { + val driverDir = createWorkingDirectory() + val localJarFilename = downloadUserJar(driverDir) + val command = Seq("java", "-cp", localJarFilename, mainClass) + runCommandWithRetry(command, driverDir) + } + catch { + case e: Exception => exn = Some(e) + } + + val finalState = + if (killed) { DriverState.KILLED } + else if (exn.isDefined) { DriverState.FAILED } + else { DriverState.FINISHED } + + worker ! DriverStateChanged(driverId, finalState, exn) + } + }.start() + } + + /** Terminate this driver (or prevent it from ever starting if not yet started) */ + def kill() { + killed = true + process.foreach(p => p.destroy()) + } + + /** Spawn a thread that will redirect a given stream to a file */ + def redirectStream(in: InputStream, file: File) { + val out = new FileOutputStream(file, true) + new Thread("redirect output to " + file) { + override def run() { + try { + Utils.copyStream(in, out, true) + } catch { + case e: IOException => + logInfo("Redirection to " + file + " closed: " + e.getMessage) + } + } + }.start() + } + + /** + * Creates the working directory for this driver. + * Will throw an exception if there are errors preparing the directory. + */ + def createWorkingDirectory(): File = { + val driverDir = new File(workDir, driverId) + if (!driverDir.exists() && !driverDir.mkdirs()) { + throw new IOException("Failed to create directory " + driverDir) + } + driverDir + } + + /** + * Download the user jar into the supplied directory and return its local path. + * Will throw an exception if there are errors downloading the jar. + */ + def downloadUserJar(driverDir: File): String = { + + val jarPath = new Path(jarUrl) + + val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path + val jarFileSystem = jarPath.getFileSystem(emptyConf) + + val destPath = new Path(driverDir.getAbsolutePath()) + val destFileSystem = destPath.getFileSystem(emptyConf) + val jarFileName = jarPath.getName + val localJarFile = new File(driverDir, jarFileName) + val localJarFilename = localJarFile.getAbsolutePath + + if (!localJarFile.exists()) { // May already exist if running multiple workers on one node + logInfo(s"Copying user jar $jarPath to $destPath") + FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf) + } + + if (!localJarFile.exists()) { // Verify copy succeeded + throw new Exception(s"Did not see expected jar $jarFileName in $driverDir") + } + + localJarFilename + } + + /** Continue launching the supplied command until it exits zero. */ + def runCommandWithRetry(command: Seq[String], baseDir: File) = { + /* Time to wait between submission retries. */ + var waitSeconds = 1 + // TODO: We should distinguish between "immediate" exits and cases where it was running + // for a long time and then exits. + var cleanExit = false + + while (!cleanExit && !killed) { + Thread.sleep(waitSeconds * 1000) + val builder = new ProcessBuilder(command: _*).directory(baseDir) + logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) + + process = Some(builder.start()) + + // Redirect stdout and stderr to files + val stdout = new File(baseDir, "stdout") + redirectStream(process.get.getInputStream, stdout) + + val stderr = new File(baseDir, "stderr") + val header = "Driver Command: %s\n%s\n\n".format( + command.mkString("\"", "\" \"", "\""), "=" * 40) + Files.write(header, stderr, Charsets.UTF_8) + redirectStream(process.get.getErrorStream, stderr) + + + val exitCode = + /* There is a race here I've elected to ignore for now because it's very unlikely and not + * simple to fix. This could see `killed=false` then the main thread gets a kill request + * and sets `killed=true` and destroys the not-yet-started process, then this thread + * launches the process. For now, in that case the user can just re-submit the kill + * request. */ + if (killed) -1 + else process.get.waitFor() + + cleanExit = exitCode == 0 + if (!cleanExit && !killed) { + waitSeconds = waitSeconds * 2 // exponential back-off + logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 87531b6719..a2b491a72f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -30,18 +30,10 @@ import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.Master +import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.deploy.DeployMessages.WorkerStateResponse -import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed -import org.apache.spark.deploy.DeployMessages.KillExecutor -import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import org.apache.spark.deploy.DeployMessages.Heartbeat -import org.apache.spark.deploy.DeployMessages.RegisteredWorker -import org.apache.spark.deploy.DeployMessages.LaunchExecutor -import org.apache.spark.deploy.DeployMessages.RegisterWorker /** * @param masterUrls Each url should look like spark://host:port. @@ -83,6 +75,9 @@ private[spark] class Worker( var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] + val drivers = new HashMap[String, DriverRunner] + val finishedDrivers = new HashMap[String, DriverRunner] + val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host @@ -193,7 +188,7 @@ private[spark] class Worker( val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) - sender ! WorkerSchedulerStateResponse(workerId, execs.toList) + sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq) case RegisterWorkerFailed(message) => if (!registered) { @@ -247,13 +242,56 @@ private[spark] class Worker( } } + case LaunchDriver(driverId, jarUrl, mainClass, memory) => { + logInfo(s"Asked to launch driver $driverId") + val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self) + drivers(driverId) = driver + driver.start() + + coresUsed += 1 + memoryUsed += memory + } + + case KillDriver(driverId) => { + logInfo(s"Asked to kill driver $driverId") + + drivers.find(_._1 == driverId) match { + case Some((id, runner)) => + runner.kill() + case None => + logError(s"Asked to kill unknown driver $driverId") + } + + } + + + case DriverStateChanged(driverId, state, exception) => { + state match { + case DriverState.FAILED => + logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + case DriverState.FINISHED => + logInfo(s"Driver $driverId exited successfully") + case DriverState.KILLED => + logInfo(s"Driver $driverId was killed") + } + masterLock.synchronized { + master ! DriverStateChanged(driverId, state, exception) + } + val driver = drivers(driverId) + memoryUsed -= driver.memory + coresUsed -= 1 + drivers -= driverId + finishedDrivers(driverId) = driver + } + case x: DisassociatedEvent if x.remoteAddress == masterAddress => logInfo(s"$x Disassociated !") masterDisconnected() case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, activeMasterUrl, cores, memory, + finishedExecutors.values.toList, drivers.values.toList, + finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 0d59048313..e233b82585 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -30,7 +30,7 @@ import net.liftweb.json.JsonAST.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -import org.apache.spark.deploy.worker.ExecutorRunner +import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils @@ -56,6 +56,12 @@ private[spark] class IndexPage(parent: WorkerWebUI) { val finishedExecutorTable = UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs") + val runningDriverTable = + UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers) + def finishedDriverTable = + UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers) + val content = <div class="row-fluid"> <!-- Worker Details --> <div class="span12"> @@ -84,6 +90,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) { <h4> Finished Executors </h4> {finishedExecutorTable} </div> + </div> + + <div class="row-fluid"> <!-- Running Drivers --> + <div class="span12"> + <h4> Running Drivers {workerState.drivers.size} </h4> + {runningDriverTable} + </div> + </div> + + <div class="row-fluid"> <!-- Finished Drivers --> + <div class="span12"> + <h4> Finished Drivers </h4> + {finishedDriverTable} + </div> </div>; UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( @@ -111,6 +131,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) { .format(executor.appId, executor.execId)}>stderr</a> </td> </tr> + } + def driverRow(driver: DriverRunner): Seq[Node] = { + <tr> + <td>{driver.driverId}</td> + <td>{driver.mainClass}</td> + <td sorttable_customkey={driver.memory.toString}> + {Utils.megabytesToString(driver.memory)} + </td> + <td> + <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a> + <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a> + </td> + </tr> + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 40d6bdb3fd..d128e58797 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -69,30 +69,44 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 - val appId = request.getParameter("appId") - val executorId = request.getParameter("executorId") + + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) val logType = request.getParameter("logType") val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + + val path = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + s"${workDir.getPath}/$appId/$executorId/$logType" + case (None, None, Some(d)) => + s"${workDir.getPath}/$driverId/$logType" + } val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) val logLength = file.length - val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n" - .format(startByte, endByte, logLength, appId, executorId, logType) + val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" pre + Utils.offsetBytes(path, startByte, endByte) } def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { val defaultBytes = 100 * 1024 - val appId = request.getParameter("appId") - val executorId = request.getParameter("executorId") + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) val logType = request.getParameter("logType") val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + + val (path, params) = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + case (None, None, Some(d)) => + (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + } val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) @@ -106,9 +120,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val backButton = if (startByte > 0) { - <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s" - .format(appId, executorId, logType, math.max(startByte-byteLength, 0), - byteLength)}> + <a href={"?%s&logType=%s&offset=%s&byteLength=%s" + .format(params, logType, math.max(startByte-byteLength, 0), byteLength)}> <button type="button" class="btn btn-default"> Previous {Utils.bytesToString(math.min(byteLength, startByte))} </button> @@ -122,8 +135,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val nextButton = if (endByte < logLength) { - <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s". - format(appId, executorId, logType, endByte, byteLength)}> + <a href={"?%s&logType=%s&offset=%s&byteLength=%s". + format(params, logType, endByte, byteLength)}> <button type="button" class="btn btn-default"> Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))} </button> |