aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-25 00:24:00 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-25 01:19:01 -0800
commitd5f23e0083b8b00109eb466ef5fb74558dcbc040 (patch)
treececf7993c60010213dca63c135475e7a7f7faddf /core/src/main/scala/org/apache
parent760823d3937822ea4a6d6f476815442711c605fa (diff)
downloadspark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.tar.gz
spark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.tar.bz2
spark-d5f23e0083b8b00109eb466ef5fb74558dcbc040.zip
Adding scheduling and reporting based on cores
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala5
6 files changed, 14 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9bfacfc999..939e000695 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -425,7 +425,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// First schedule drivers, they take strict precedence over applications
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (driver <- Seq(waitingDrivers: _*)) {
- if (worker.memoryFree > driver.desc.mem) {
+ if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 27c2ff4b8c..6e5177baa6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -88,13 +88,13 @@ private[spark] class WorkerInfo(
def addDriver(driver: DriverInfo) {
drivers(driver.id) = driver
memoryUsed += driver.desc.mem
- coresUsed += 1
+ coresUsed += driver.desc.cores
}
def removeDriver(driver: DriverInfo) {
drivers -= driver.id
memoryUsed -= driver.desc.mem
- coresUsed -= 1
+ coresUsed -= driver.desc.cores
}
def webUiAddress : String = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 13903b4a1d..24d10cec4a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -57,7 +57,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
- val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class")
+ val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class")
val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
@@ -166,6 +166,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{driver.submitDate}</td>
<td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
<td>{driver.state}</td>
+ <td sorttable_customkey={driver.desc.cores.toString}>
+ {driver.desc.cores.toString}
+ </td>
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 41a089ad33..ba13f22fc5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -53,8 +53,8 @@ private[spark] class DriverRunner(
try {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++
- Seq(driverDesc.mainClass) ++ driverDesc.options
+ val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m")
+ Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options
runCommandWithRetry(command, driverDesc.envVars, driverDir)
}
catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index dd6783a344..3c5159ac13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -279,7 +279,7 @@ private[spark] class Worker(
}
val driver = drivers(driverId)
memoryUsed -= driver.driverDesc.mem
- coresUsed -= 1
+ coresUsed -= driver.driverDesc.cores
drivers -= driverId
finishedDrivers(driverId) = driver
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 2c37b7184d..35e8d58215 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -56,7 +56,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val finishedExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
- val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs")
+ val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
val runningDriverTable =
UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
def finishedDriverTable =
@@ -138,6 +138,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<tr>
<td>{driver.driverId}</td>
<td>{driver.driverDesc.mainClass}</td>
+ <td sorttable_customkey={driver.driverDesc.cores.toString}>
+ {driver.driverDesc.cores.toString}
+ </td>
<td sorttable_customkey={driver.driverDesc.mem.toString}>
{Utils.megabytesToString(driver.driverDesc.mem)}
</td>