aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala97
2 files changed, 109 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index e607b8c6f4..33e69371b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
import net.liftweb.json.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo, DriverInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
@@ -32,9 +32,12 @@ private[spark] object JsonProtocol {
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
+ ("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
- ("state" -> obj.state.toString)
+ ("memoryfree" -> obj.memoryFree) ~
+ ("state" -> obj.state.toString) ~
+ ("lastheartbeat" -> obj.lastHeartbeat)
}
def writeApplicationInfo(obj: ApplicationInfo) = {
@@ -54,7 +57,9 @@ private[spark] object JsonProtocol {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
- ("user" -> obj.user)
+ ("user" -> obj.user) ~
+ ("sparkhome" -> obj.sparkHome) ~
+ ("command" -> obj.command.toString)
}
def writeExecutorRunner(obj: ExecutorRunner) = {
@@ -64,6 +69,14 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
+ def writeDriverInfo(obj: DriverInfo) = {
+ ("id" -> obj.id) ~
+ ("starttime" -> obj.startTime.toString) ~
+ ("state" -> obj.state.toString) ~
+ ("cores" -> obj.desc.cores) ~
+ ("memory" -> obj.desc.mem)
+ }
+
def writeMasterState(obj: MasterStateResponse) = {
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
@@ -73,6 +86,7 @@ private[spark] object JsonProtocol {
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
+ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 693b1ab237..6445db0063 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.deploy
import java.io.File
import java.util.Date
+import net.liftweb.json.Diff
import net.liftweb.json.{JsonAST, JsonParser}
-import net.liftweb.json.JsonAST.JValue
+import net.liftweb.json.JsonAST.{JNothing, JValue}
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
@@ -29,24 +30,35 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryStat
import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner}
class JsonProtocolSuite extends FunSuite {
+
test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.appInfoJsonStr))
}
test("writeWorkerInfo") {
val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerInfoJsonStr))
}
test("writeApplicationDescription") {
val output = JsonProtocol.writeApplicationDescription(createAppDesc())
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.appDescJsonStr))
}
test("writeExecutorRunner") {
val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.executorRunnerJsonStr))
+ }
+
+ test("writeDriverInfo") {
+ val output = JsonProtocol.writeDriverInfo(createDriverInfo())
+ assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.driverInfoJsonStr))
}
test("writeMasterState") {
@@ -59,6 +71,7 @@ class JsonProtocolSuite extends FunSuite {
activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.masterStateJsonStr))
}
test("writeWorkerState") {
@@ -70,6 +83,7 @@ class JsonProtocolSuite extends FunSuite {
finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
assertValidJson(output)
+ assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerStateJsonStr))
}
def createAppDesc(): ApplicationDescription = {
@@ -78,8 +92,10 @@ class JsonProtocolSuite extends FunSuite {
}
def createAppInfo() : ApplicationInfo = {
- new ApplicationInfo(
- 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
+ val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
+ "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue)
+ appInfo.endTime = JsonConstants.currTimeInMillis
+ appInfo
}
def createDriverCommand() = new Command(
@@ -90,10 +106,13 @@ class JsonProtocolSuite extends FunSuite {
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
false, createDriverCommand())
- def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date())
+ def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
+ createDriverDesc(), new Date())
def createWorkerInfo(): WorkerInfo = {
- new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+ val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+ workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
+ workerInfo
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
@@ -111,4 +130,72 @@ class JsonProtocolSuite extends FunSuite {
case e: JsonParser.ParseException => fail("Invalid Json detected", e)
}
}
+
+ def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
+ val Diff(c, a, d) = validateJson diff expectedJson
+ assert(c === JNothing, "Json changed")
+ assert(a === JNothing, "Json added")
+ assert(d === JNothing, "Json deleted")
+ }
+}
+
+object JsonConstants {
+ val currTimeInMillis = System.currentTimeMillis()
+ val appInfoStartTime = 3
+ val submitDate = new Date(123456789)
+ val appInfoJsonStr =
+ """
+ |{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr",
+ |"cores":4,"user":"%s",
+ |"memoryperslave":1234,"submitdate":"%s",
+ |"state":"WAITING","duration":%d}
+ """.format(System.getProperty("user.name", "<unknown>"),
+ submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin
+
+ val workerInfoJsonStr =
+ """
+ |{"id":"id","host":"host","port":8080,
+ |"webuiaddress":"http://publicAddress:80",
+ |"cores":4,"coresused":0,"coresfree":4,
+ |"memory":1234,"memoryused":0,"memoryfree":1234,
+ |"state":"ALIVE","lastheartbeat":%d}
+ """.format(currTimeInMillis).stripMargin
+
+ val appDescJsonStr =
+ """
+ |{"name":"name","cores":4,"memoryperslave":1234,
+ |"user":"%s","sparkhome":"sparkHome",
+ |"command":"Command(mainClass,List(arg1, arg2),Map())"}
+ """.format(System.getProperty("user.name", "<unknown>")).stripMargin
+
+ val executorRunnerJsonStr =
+ """
+ |{"id":123,"memory":1234,"appid":"appId",
+ |"appdesc":%s}
+ """.format(appDescJsonStr).stripMargin
+
+ val driverInfoJsonStr =
+ """
+ |{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
+ """.stripMargin
+
+ val masterStateJsonStr =
+ """
+ |{"url":"spark://host:8080",
+ |"workers":[%s,%s],
+ |"cores":8,"coresused":0,"memory":2468,"memoryused":0,
+ |"activeapps":[%s],"completedapps":[],
+ |"activedrivers":[%s],
+ |"status":"ALIVE"}
+ """.format(workerInfoJsonStr, workerInfoJsonStr,
+ appInfoJsonStr, driverInfoJsonStr).stripMargin
+
+ val workerStateJsonStr =
+ """
+ |{"id":"workerId","masterurl":"masterUrl",
+ |"masterwebuiurl":"masterWebUiUrl",
+ |"cores":4,"coresused":4,"memory":1234,"memoryused":1234,
+ |"executors":[],
+ |"finishedexecutors":[%s,%s]}
+ """.format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin
}