diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 00:24:00 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 01:19:01 -0800 |
commit | d5f23e0083b8b00109eb466ef5fb74558dcbc040 (patch) | |
tree | cecf7993c60010213dca63c135475e7a7f7faddf | |
parent | 760823d3937822ea4a6d6f476815442711c605fa (diff) | |
download | spark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.tar.gz spark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.tar.bz2 spark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.zip |
Adding scheduling and reporting based on cores
6 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9bfacfc999..939e000695 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -425,7 +425,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // First schedule drivers, they take strict precedence over applications for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (driver <- Seq(waitingDrivers: _*)) { - if (worker.memoryFree > driver.desc.mem) { + if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 27c2ff4b8c..6e5177baa6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -88,13 +88,13 @@ private[spark] class WorkerInfo( def addDriver(driver: DriverInfo) { drivers(driver.id) = driver memoryUsed += driver.desc.mem - coresUsed += 1 + coresUsed += driver.desc.cores } def removeDriver(driver: DriverInfo) { drivers -= driver.id memoryUsed -= driver.desc.mem - coresUsed -= 1 + coresUsed -= driver.desc.cores } def webUiAddress : String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 13903b4a1d..24d10cec4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -57,7 +57,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { val completedApps = state.completedApps.sortBy(_.endTime).reverse val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) - val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class") + val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class") val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse @@ -166,6 +166,9 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td>{driver.submitDate}</td> <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td> <td>{driver.state}</td> + <td sorttable_customkey={driver.desc.cores.toString}> + {driver.desc.cores.toString} + </td> <td sorttable_customkey={driver.desc.mem.toString}> {Utils.megabytesToString(driver.desc.mem.toLong)} </td> diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 41a089ad33..ba13f22fc5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -53,8 +53,8 @@ private[spark] class DriverRunner( try { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) - val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++ - Seq(driverDesc.mainClass) ++ driverDesc.options + val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m") + Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options runCommandWithRetry(command, driverDesc.envVars, driverDir) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index dd6783a344..3c5159ac13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -279,7 +279,7 @@ private[spark] class Worker( } val driver = drivers(driverId) memoryUsed -= driver.driverDesc.mem - coresUsed -= 1 + coresUsed -= driver.driverDesc.cores drivers -= driverId finishedDrivers(driverId) = driver } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 2c37b7184d..35e8d58215 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -56,7 +56,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { val finishedExecutorTable = UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) - val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs") + val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs") val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers) def finishedDriverTable = @@ -138,6 +138,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) { <tr> <td>{driver.driverId}</td> <td>{driver.driverDesc.mainClass}</td> + <td sorttable_customkey={driver.driverDesc.cores.toString}> + {driver.driverDesc.cores.toString} + </td> <td sorttable_customkey={driver.driverDesc.mem.toString}> {Utils.megabytesToString(driver.driverDesc.mem)} </td> |