aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/resources/spark/deploy/master/webui/index.html6
-rw-r--r--core/src/main/resources/spark/deploy/worker/webui/index.html6
-rw-r--r--core/src/main/scala/spark/Utils.scala31
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/Master.scala44
-rw-r--r--core/src/main/scala/spark/deploy/MasterWebUI.scala12
-rw-r--r--core/src/main/scala/spark/deploy/Worker.scala10
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala94
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterArguments.scala (renamed from core/src/main/scala/spark/deploy/MasterArguments.scala)10
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala17
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala58
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerArguments.scala75
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala24
-rw-r--r--core/src/main/scala/spark/util/MemoryParam.scala17
-rw-r--r--core/src/test/scala/spark/MesosSchedulerSuite.scala30
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala20
-rwxr-xr-xrun1
18 files changed, 351 insertions, 127 deletions
diff --git a/core/src/main/resources/spark/deploy/master/webui/index.html b/core/src/main/resources/spark/deploy/master/webui/index.html
new file mode 100644
index 0000000000..c11101045e
--- /dev/null
+++ b/core/src/main/resources/spark/deploy/master/webui/index.html
@@ -0,0 +1,6 @@
+<html>
+<head><title>Hello world!</title></head>
+<body>
+<p>Hello world!</p>
+</body>
+</html> \ No newline at end of file
diff --git a/core/src/main/resources/spark/deploy/worker/webui/index.html b/core/src/main/resources/spark/deploy/worker/webui/index.html
new file mode 100644
index 0000000000..c11101045e
--- /dev/null
+++ b/core/src/main/resources/spark/deploy/worker/webui/index.html
@@ -0,0 +1,6 @@
+<html>
+<head><title>Hello world!</title></head>
+<body>
+<p>Hello world!</p>
+</body>
+</html> \ No newline at end of file
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 44dca2e4f1..95c833ea5b 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -200,9 +200,27 @@ object Utils {
}
/**
- * Use unit suffixes (Byte, Kilobyte, Megabyte, Gigabyte, Terabyte and
- * Petabyte) in order to reduce the number of digits to four or less. For
- * example, 4,000,000 is returned as 4MB.
+ * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+ * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
+ * environment variable.
+ */
+ def memoryStringToMb(str: String): Int = {
+ val lower = str.toLowerCase
+ if (lower.endsWith("k")) {
+ (lower.substring(0, lower.length-1).toLong / 1024).toInt
+ } else if (lower.endsWith("m")) {
+ lower.substring(0, lower.length-1).toInt
+ } else if (lower.endsWith("g")) {
+ lower.substring(0, lower.length-1).toInt * 1024
+ } else if (lower.endsWith("t")) {
+ lower.substring(0, lower.length-1).toInt * 1024 * 1024
+ } else {// no suffix, so it's just a number in bytes
+ (lower.toLong / 1024 / 1024).toInt
+ }
+ }
+
+ /**
+ * Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
*/
def memoryBytesToString(size: Long): String = {
val TB = 1L << 40
@@ -225,4 +243,11 @@ object Utils {
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
+
+ /**
+ * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
+ */
+ def memoryMegabytesToString(megabytes: Long): String = {
+ memoryBytesToString(megabytes * 1024L * 1024L)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
new file mode 100644
index 0000000000..4e641157e1
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -0,0 +1,6 @@
+package spark.deploy
+
+sealed trait DeployMessage
+
+case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage
+case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage
diff --git a/core/src/main/scala/spark/deploy/Master.scala b/core/src/main/scala/spark/deploy/Master.scala
deleted file mode 100644
index da2f678f52..0000000000
--- a/core/src/main/scala/spark/deploy/Master.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package spark.deploy
-
-import akka.actor.{ActorRef, Props, Actor, ActorSystem}
-import spark.{Logging, Utils}
-import scala.collection.immutable.{::, Nil}
-import spark.util.{AkkaUtils, IntParam}
-import cc.spray.Directives
-
-sealed trait MasterMessage
-case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage
-
-class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
- override def preStart() {
- logInfo("Starting Spark master at spark://" + ip + ":" + port)
- startWebUi()
- }
-
- def startWebUi() {
- val webUi = new MasterWebUI(context.system, self)
- try {
- AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
- } catch {
- case e: Exception =>
- logError("Failed to create web UI", e)
- System.exit(1)
- }
- }
-
- override def receive = {
- case RegisterSlave(host, slavePort, cores, memory) =>
- logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
- host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L)))
- }
-}
-
-object Master {
- def main(args: Array[String]) {
- val params = new MasterArguments(args)
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port)
- val actor = actorSystem.actorOf(
- Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master")
- actorSystem.awaitTermination()
- }
-}
diff --git a/core/src/main/scala/spark/deploy/MasterWebUI.scala b/core/src/main/scala/spark/deploy/MasterWebUI.scala
deleted file mode 100644
index 3f078322e1..0000000000
--- a/core/src/main/scala/spark/deploy/MasterWebUI.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package spark.deploy
-
-import akka.actor.{ActorRef, ActorSystem}
-import cc.spray.Directives
-
-class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
- val handler = {
- path("") {
- get { _.complete("Hello world!") }
- }
- }
-}
diff --git a/core/src/main/scala/spark/deploy/Worker.scala b/core/src/main/scala/spark/deploy/Worker.scala
deleted file mode 100644
index 7210a4b902..0000000000
--- a/core/src/main/scala/spark/deploy/Worker.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package spark.deploy
-
-class Worker(cores: Int, memoryMb: Int) {
-
-}
-
-object Worker {
- def main(args: Array[String]) {
- }
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
new file mode 100644
index 0000000000..8fdecfa08c
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -0,0 +1,94 @@
+package spark.deploy.master
+
+import scala.collection.mutable.HashMap
+
+import akka.actor.{Terminated, ActorRef, Props, Actor}
+import spark.{Logging, Utils}
+import spark.util.AkkaUtils
+import java.text.SimpleDateFormat
+import java.util.Date
+import spark.deploy.{RegisteredSlave, RegisterSlave}
+
+class SlaveInfo(
+ val id: Int,
+ val host: String,
+ val port: Int,
+ val cores: Int,
+ val memory: Int,
+ val actor: ActorRef) {
+ var coresUsed = 0
+ var memoryUsed = 0
+
+ def coresFree: Int = cores - coresUsed
+
+ def memoryFree: Int = memory - memoryUsed
+}
+
+class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
+ val clusterId = newClusterId()
+ var nextSlaveId = 0
+ var nextJobId = 0
+ val slaves = new HashMap[Int, SlaveInfo]
+ val actorToSlave = new HashMap[ActorRef, SlaveInfo]
+
+ override def preStart() {
+ logInfo("Starting Spark master at spark://" + ip + ":" + port)
+ logInfo("Cluster ID: " + clusterId)
+ startWebUi()
+ }
+
+ def startWebUi() {
+ val webUi = new MasterWebUI(context.system, self)
+ try {
+ AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
+ } catch {
+ case e: Exception =>
+ logError("Failed to create web UI", e)
+ System.exit(1)
+ }
+ }
+
+ override def receive = {
+ case RegisterSlave(host, slavePort, cores, memory) => {
+ logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
+ host, slavePort, cores, Utils.memoryMegabytesToString(memory)))
+ val id = newSlaveId()
+ slaves(id) = new SlaveInfo(id, host, slavePort, cores, memory, sender)
+ actorToSlave(sender) = slaves(id)
+ context.watch(sender)
+ sender ! RegisteredSlave(clusterId, id)
+ }
+
+ case Terminated(actor) => {
+ logInfo("Slave disconnected: " + actor)
+ actorToSlave.get(actor) match {
+ case Some(slave) =>
+ logInfo("Removing slave " + slave.id)
+ slaves -= slave.id
+ actorToSlave -= actor
+ case None =>
+ logError("Did not have any slave registered for " + actor)
+ }
+ }
+ }
+
+ def newClusterId(): String = {
+ val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date())
+ "%s-%04d".format(date, (math.random * 10000).toInt)
+ }
+
+ def newSlaveId(): Int = {
+ nextSlaveId += 1
+ nextSlaveId - 1
+ }
+}
+
+object Master {
+ def main(argStrings: Array[String]) {
+ val args = new MasterArguments(argStrings)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
+ val actor = actorSystem.actorOf(
+ Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
index c948a405ef..ca4b8a143f 100644
--- a/core/src/main/scala/spark/deploy/MasterArguments.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala
@@ -1,4 +1,4 @@
-package spark.deploy
+package spark.deploy.master
import spark.util.IntParam
import spark.Utils
@@ -7,9 +7,9 @@ import spark.Utils
* Command-line parser for the master.
*/
class MasterArguments(args: Array[String]) {
- var ip: String = Utils.localIpAddress()
- var port: Int = 7077
- var webUiPort: Int = 8080
+ var ip = Utils.localIpAddress()
+ var port = 7077
+ var webUiPort = 8080
parse(args.toList)
@@ -45,7 +45,7 @@ class MasterArguments(args: Array[String]) {
"Options:\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
- " --webui-port PORT Port for web UI (default: 8080)\n")
+ " --webui-port PORT Port for web UI (default: 8080)")
System.exit(exitCode)
}
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
new file mode 100644
index 0000000000..b0c871dd7b
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -0,0 +1,17 @@
+package spark.deploy.master
+
+import akka.actor.{ActorRef, ActorSystem}
+import cc.spray.Directives
+
+class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
+ val RESOURCE_DIR = "spark/deploy/master/webui"
+
+ val handler = {
+ get {
+ path("") {
+ getFromResource(RESOURCE_DIR + "/index.html")
+ } ~
+ getFromResourceDirectory(RESOURCE_DIR)
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
new file mode 100644
index 0000000000..fd49223798
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -0,0 +1,58 @@
+package spark.deploy.worker
+
+import scala.collection.mutable.HashMap
+
+import akka.actor.{Terminated, ActorRef, Props, Actor}
+import spark.{Logging, Utils}
+import spark.util.AkkaUtils
+import java.text.SimpleDateFormat
+import java.util.Date
+import spark.deploy.{RegisteredSlave, RegisterSlave}
+
+class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int)
+ extends Actor with Logging {
+
+ var coresUsed = 0
+ var memoryUsed = 0
+
+ def coresFree: Int = cores - coresUsed
+ def memoryFree: Int = memory - memoryUsed
+
+ override def preStart() {
+ logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
+ ip, port, cores, Utils.memoryMegabytesToString(memory)))
+ startWebUi()
+ }
+
+ def startWebUi() {
+ val webUi = new WorkerWebUI(context.system, self)
+ try {
+ AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
+ } catch {
+ case e: Exception =>
+ logError("Failed to create web UI", e)
+ System.exit(1)
+ }
+ }
+
+ override def receive = {
+ case RegisteredSlave(clusterId, slaveId) => {
+ logInfo("Registered with cluster ID " + clusterId + ", slave ID " + slaveId)
+ }
+
+ case Terminated(actor) => {
+ logError("Master disconnected!")
+ }
+ }
+}
+
+object Worker {
+ def main(argStrings: Array[String]) {
+ val args = new WorkerArguments(argStrings)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
+ val actor = actorSystem.actorOf(
+ Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory)),
+ name = "Worker")
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
new file mode 100644
index 0000000000..cd112b7fa3
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
@@ -0,0 +1,75 @@
+package spark.deploy.worker
+
+import spark.util.IntParam
+import spark.util.MemoryParam
+import spark.Utils
+import java.lang.management.ManagementFactory
+
+/**
+ * Command-line parser for the master.
+ */
+class WorkerArguments(args: Array[String]) {
+ var ip = Utils.localIpAddress()
+ var port = 0
+ var webUiPort = 8081
+ var cores = inferDefaultCores()
+ var memory = inferDefaultMemory()
+
+ parse(args.toList)
+
+ def parse(args: List[String]): Unit = args match {
+ case ("--ip" | "-i") :: value :: tail =>
+ ip = value
+ parse(tail)
+
+ case ("--port" | "-p") :: IntParam(value) :: tail =>
+ port = value
+ parse(tail)
+
+ case ("--cores" | "-c") :: IntParam(value) :: tail =>
+ cores = value
+ parse(tail)
+
+ case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
+ memory = value
+ parse(tail)
+
+ case "--webui-port" :: IntParam(value) :: tail =>
+ webUiPort = value
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil => {}
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ /**
+ * Print usage and exit JVM with the given exit code.
+ */
+ def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ "Usage: spark-worker [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -c CORES, --cores CORES Number of cores to use\n" +
+ " -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" +
+ " -i IP, --ip IP IP address or DNS name to listen on\n" +
+ " -p PORT, --port PORT Port to listen on (default: random)\n" +
+ " --webui-port PORT Port for web UI (default: 8081)")
+ System.exit(exitCode)
+ }
+
+ def inferDefaultCores(): Int = {
+ Runtime.getRuntime.availableProcessors()
+ }
+
+ def inferDefaultMemory(): Int = {
+ val bean = ManagementFactory.getOperatingSystemMXBean
+ .asInstanceOf[com.sun.management.OperatingSystemMXBean]
+ (bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
new file mode 100644
index 0000000000..efd3822e61
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -0,0 +1,17 @@
+package spark.deploy.worker
+
+import akka.actor.{ActorRef, ActorSystem}
+import cc.spray.Directives
+
+class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
+ val RESOURCE_DIR = "spark/deploy/worker/webui"
+
+ val handler = {
+ get {
+ path("") {
+ getFromResource(RESOURCE_DIR + "/index.html")
+ } ~
+ getFromResourceDirectory(RESOURCE_DIR)
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
index 8e34537674..9113348976 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
@@ -45,7 +45,7 @@ class MesosScheduler(
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
- MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM"))
+ Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
@@ -467,25 +467,3 @@ class MesosScheduler(
driver.reviveOffers()
}
}
-
-object MesosScheduler {
- /**
- * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
- * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
- * environment variable.
- */
- def memoryStringToMb(str: String): Int = {
- val lower = str.toLowerCase
- if (lower.endsWith("k")) {
- (lower.substring(0, lower.length-1).toLong / 1024).toInt
- } else if (lower.endsWith("m")) {
- lower.substring(0, lower.length-1).toInt
- } else if (lower.endsWith("g")) {
- lower.substring(0, lower.length-1).toInt * 1024
- } else if (lower.endsWith("t")) {
- lower.substring(0, lower.length-1).toInt * 1024 * 1024
- } else {// no suffix, so it's just a number in bytes
- (lower.toLong / 1024 / 1024).toInt
- }
- }
-}
diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala
new file mode 100644
index 0000000000..4fba914afe
--- /dev/null
+++ b/core/src/main/scala/spark/util/MemoryParam.scala
@@ -0,0 +1,17 @@
+package spark.util
+
+import spark.Utils
+
+/**
+ * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
+ * the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
+ */
+object MemoryParam {
+ def unapply(str: String): Option[Int] = {
+ try {
+ Some(Utils.memoryStringToMb(str))
+ } catch {
+ case e: NumberFormatException => None
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala
deleted file mode 100644
index 54421225d8..0000000000
--- a/core/src/test/scala/spark/MesosSchedulerSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-package spark
-
-import org.scalatest.FunSuite
-
-import spark.scheduler.mesos.MesosScheduler
-
-class MesosSchedulerSuite extends FunSuite {
- test("memoryStringToMb"){
-
- assert(MesosScheduler.memoryStringToMb("1") == 0)
- assert(MesosScheduler.memoryStringToMb("1048575") == 0)
- assert(MesosScheduler.memoryStringToMb("3145728") == 3)
-
- assert(MesosScheduler.memoryStringToMb("1024k") == 1)
- assert(MesosScheduler.memoryStringToMb("5000k") == 4)
- assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
-
- assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
- assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
- assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
-
- assert(MesosScheduler.memoryStringToMb("2g") == 2048)
- assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
-
- assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
- assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))
-
-
- }
-}
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index a46d90223e..ed4701574f 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -26,5 +26,25 @@ class UtilsSuite extends FunSuite {
assert(os.toByteArray.toList.equals(bytes.toList))
}
+
+ test("memoryStringToMb"){
+ assert(Utils.memoryStringToMb("1") == 0)
+ assert(Utils.memoryStringToMb("1048575") == 0)
+ assert(Utils.memoryStringToMb("3145728") == 3)
+
+ assert(Utils.memoryStringToMb("1024k") == 1)
+ assert(Utils.memoryStringToMb("5000k") == 4)
+ assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K"))
+
+ assert(Utils.memoryStringToMb("1024m") == 1024)
+ assert(Utils.memoryStringToMb("5000m") == 5000)
+ assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M"))
+
+ assert(Utils.memoryStringToMb("2g") == 2048)
+ assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G"))
+
+ assert(Utils.memoryStringToMb("2t") == 2097152)
+ assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T"))
+ }
}
diff --git a/run b/run
index 2bc025ec0b..5ba94b3243 100755
--- a/run
+++ b/run
@@ -46,6 +46,7 @@ CLASSPATH="$SPARK_CLASSPATH"
CLASSPATH+=":$MESOS_CLASSPATH"
CLASSPATH+=":$FWDIR/conf"
CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $CORE_DIR/lib -name '*jar'`; do