aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRichard Benkovsky <richard.benkovsky@gooddata.com>2012-05-20 11:14:12 +0200
committerRichard Benkovsky <richard.benkovsky@gooddata.com>2012-05-22 11:04:54 +0200
commitae64920337f1658a5b3a2ce65fd94679cc23822f (patch)
tree40590483dfd9b9d3de4aa54663d2626433ce6708 /core
parent3a1bcd4028d84fa5cc7a7cb230f41ae6bb87c352 (diff)
downloadspark-ae64920337f1658a5b3a2ce65fd94679cc23822f.tar.gz
spark-ae64920337f1658a5b3a2ce65fd94679cc23822f.tar.bz2
spark-ae64920337f1658a5b3a2ce65fd94679cc23822f.zip
MesosScheduler refactoring
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala27
-rw-r--r--core/src/test/scala/spark/MesosSchedulerSuite.scala28
2 files changed, 42 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index b95f40b877..755e001106 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -42,7 +42,7 @@ private class MesosScheduler(
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
- memoryStringToMb(System.getenv("SPARK_MEM"))
+ MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
@@ -78,9 +78,7 @@ private class MesosScheduler(
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] {
- override def compare(j1: Job, j2: Job): Int = {
- return j2.runId - j1.runId
- }
+ override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId
}
def newJobId(): Int = this.synchronized {
@@ -156,7 +154,7 @@ private class MesosScheduler(
activeJobs(jobId) = myJob
activeJobsQueue += myJob
logInfo("Adding job with ID " + jobId)
- jobTasks(jobId) = new HashSet()
+ jobTasks(jobId) = HashSet.empty[String]
}
driver.reviveOffers();
}
@@ -376,24 +374,27 @@ private class MesosScheduler(
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+}
+object MesosScheduler {
/**
- * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
- * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
+ * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+ * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
*/
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
- (lower.substring(0, lower.length-1).toLong / 1024).toInt
+ (lower.substring(0, lower.length - 1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
- lower.substring(0, lower.length-1).toInt
+ lower.substring(0, lower.length - 1).toInt
} else if (lower.endsWith("g")) {
- lower.substring(0, lower.length-1).toInt * 1024
+ lower.substring(0, lower.length - 1).toInt * 1024
} else if (lower.endsWith("t")) {
- lower.substring(0, lower.length-1).toInt * 1024 * 1024
- } else {// no suffix, so it's just a number in bytes
+ lower.substring(0, lower.length - 1).toInt * 1024 * 1024
+ } else {
+ // no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
}
}
-}
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala
new file mode 100644
index 0000000000..0e6820cbdc
--- /dev/null
+++ b/core/src/test/scala/spark/MesosSchedulerSuite.scala
@@ -0,0 +1,28 @@
+package spark
+
+import org.scalatest.FunSuite
+
+class MesosSchedulerSuite extends FunSuite {
+ test("memoryStringToMb"){
+
+ assert(MesosScheduler.memoryStringToMb("1") == 0)
+ assert(MesosScheduler.memoryStringToMb("1048575") == 0)
+ assert(MesosScheduler.memoryStringToMb("3145728") == 3)
+
+ assert(MesosScheduler.memoryStringToMb("1024k") == 1)
+ assert(MesosScheduler.memoryStringToMb("5000k") == 4)
+ assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
+
+ assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
+ assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
+ assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
+
+ assert(MesosScheduler.memoryStringToMb("2g") == 2048)
+ assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
+
+ assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
+ assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))
+
+
+ }
+}