aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-10 19:06:40 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-10 19:06:40 -0800
commit3548c9c0c8ae61770f6b23d905fdcb4d276cc0db (patch)
tree394050fc6697f15306306480d8e2db66058ca17b /core
parent9cc764f52323baa3a218ce9e301d3cc98f1e8b20 (diff)
parent6d1c2302810f497433df6ab8faa6bd5f40aed27f (diff)
downloadspark-3548c9c0c8ae61770f6b23d905fdcb4d276cc0db.tar.gz
spark-3548c9c0c8ae61770f6b23d905fdcb4d276cc0db.tar.bz2
spark-3548c9c0c8ae61770f6b23d905fdcb4d276cc0db.zip
Merge branch 'master' of github.com:mesos/spark
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala57
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala19
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala20
-rw-r--r--core/src/main/scala/spark/network/Connection.scala7
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala12
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala24
6 files changed, 113 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000000..f14f804b3a
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,57 @@
+package spark.deploy
+
+import master.{JobInfo, WorkerInfo}
+import cc.spray.json._
+
+/**
+ * spray-json helper class containing implicit conversion to json for marshalling responses
+ */
+private[spark] object JsonProtocol extends DefaultJsonProtocol {
+ implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
+ def write(obj: WorkerInfo) = JsObject(
+ "id" -> JsString(obj.id),
+ "host" -> JsString(obj.host),
+ "webuiaddress" -> JsString(obj.webUiAddress),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed)
+ )
+ }
+
+ implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] {
+ def write(obj: JobInfo) = JsObject(
+ "starttime" -> JsNumber(obj.startTime),
+ "id" -> JsString(obj.id),
+ "name" -> JsString(obj.desc.name),
+ "cores" -> JsNumber(obj.desc.cores),
+ "user" -> JsString(obj.desc.user),
+ "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
+ "submitdate" -> JsString(obj.submitDate.toString))
+ }
+
+ implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
+ def write(obj: MasterState) = JsObject(
+ "url" -> JsString("spark://" + obj.uri),
+ "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
+ "cores" -> JsNumber(obj.workers.map(_.cores).sum),
+ "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
+ "memory" -> JsNumber(obj.workers.map(_.memory).sum),
+ "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
+ "activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)),
+ "completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson))
+ )
+ }
+
+ implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
+ def write(obj: WorkerState) = JsObject(
+ "id" -> JsString(obj.workerId),
+ "masterurl" -> JsString(obj.masterUrl),
+ "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
+ "cores" -> JsNumber(obj.cores),
+ "coresused" -> JsNumber(obj.coresUsed),
+ "memory" -> JsNumber(obj.memory),
+ "memoryused" -> JsNumber(obj.memoryUsed)
+ )
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 3cdd3721f5..a96b55d6f3 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -9,6 +9,9 @@ import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
+import cc.spray.http.MediaTypes
+import JsonProtocol._
+import cc.spray.typeconversion.SprayJsonSupport._
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
@@ -19,13 +22,19 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith {
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") =>
val future = master ? RequestMasterState
- future.map {
- masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[MasterState])
+ }
+ case _ =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map {
+ masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ }
}
- }
} ~
path("job") {
parameter("jobId") { jobId =>
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index d06f4884ee..84b6c16bd6 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -7,7 +7,10 @@ import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
-import spark.deploy.{WorkerState, RequestWorkerState}
+import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState}
+import cc.spray.http.MediaTypes
+import JsonProtocol._
+import cc.spray.typeconversion.SprayJsonSupport._
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
@@ -18,13 +21,20 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
val handler = {
get {
- path("") {
- completeWith{
+ (path("") & parameters('format ?)) {
+ case Some(js) if js.equalsIgnoreCase("json") => {
val future = worker ? RequestWorkerState
- future.map { workerState =>
- spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ ctx.complete(future.mapTo[WorkerState])
}
}
+ case _ =>
+ completeWith{
+ val future = worker ? RequestWorkerState
+ future.map { workerState =>
+ spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ }
+ }
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 80262ab7b4..c193bf7c8d 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -135,8 +135,11 @@ extends Connection(SocketChannel.open, selector_) {
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
- if (!message.started) logDebug("Starting to send [" + message + "]")
- message.started = true
+ if (!message.started) {
+ logDebug("Starting to send [" + message + "]")
+ message.started = true
+ message.startTime = System.currentTimeMillis
+ }
return chunk
} else {
/*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 642fa4b525..36c01ad629 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
val selector = SelectorProvider.provider.openSelector()
- val handleMessageExecutor = Executors.newFixedThreadPool(4)
+ val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val messageStatuses = new HashMap[Int, MessageStatus]
- val connectionRequests = new SynchronizedQueue[SendingConnection]
+ val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
@@ -79,10 +79,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
def run() {
try {
while(!selectorThread.isInterrupted) {
- while(!connectionRequests.isEmpty) {
- val sendingConnection = connectionRequests.dequeue
+ for( (connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect()
addConnection(sendingConnection)
+ connectionRequests -= connectionManagerId
}
sendMessageRequests.synchronized {
while(!sendMessageRequests.isEmpty) {
@@ -300,8 +300,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = new SendingConnection(inetSocketAddress, selector)
- connectionRequests += newConnection
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
@@ -473,6 +472,7 @@ private[spark] object ConnectionManager {
val mb = size * count / 1024.0 / 1024.0
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
+ println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
}
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 47ceaf3c07..533e4610f3 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -13,8 +13,14 @@ import akka.util.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
+ //<mesos cluster> - the master URL
+ //<slaves file> - a list slaves to run connectionTest on
+ //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
+ //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
+ //[count] - how many times to run, default is 3
+ //[await time in seconds] : await time (in seconds), default is 600
if (args.length < 2) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")
+ println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
@@ -29,16 +35,19 @@ private[spark] object ConnectionManagerTest extends Logging{
/*println("Slaves")*/
/*slaves.foreach(println)*/
-
- val slaveConnManagerIds = sc.parallelize(0 until slaves.length, slaves.length).map(
+ val tasknum = if (args.length > 2) args(2).toInt else slaves.length
+ val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
+ val count = if (args.length > 4) args(4).toInt else 3
+ val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
+ println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+ val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
- val count = 10
(0 until count).foreach(i => {
- val resultStrs = sc.parallelize(0 until slaves.length, slaves.length).map(i => {
+ val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
val thisConnManagerId = connManager.id
connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
None
})
- val size = 100 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -56,13 +64,13 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
- val results = futures.map(f => Await.result(f, 1.second))
+ val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
- val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+ val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
logInfo(resultStr)
resultStr
}).collect()