From afc8f3cb9a7afe3249500a7d135b4a54bb3e58c4 Mon Sep 17 00:00:00 2001 From: qqsun8819 Date: Sun, 9 Feb 2014 13:57:29 -0800 Subject: Merge pull request #551 from qqsun8819/json-protocol. [SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself This is a PR for SPARK-1038. Two major changes: 1 add some fields to JsonProtocol which is new and important to standalone-related data structures 2 Use Diff in liftweb.json to verity the stringified Json output for detecting someone mod type T to Option[T] Author: qqsun8819 Closes #551 and squashes the following commits: fdf0b4e [qqsun8819] [SPARK-1038] 1. Change code style for more readable according to rxin review 2. change submitdate hard-coded string to a date object toString for more complexiblity 095a26f [qqsun8819] [SPARK-1038] mod according to review of pwendel, use hard-coded json string for json data validation. Each test use its own json string 0524e41 [qqsun8819] Merge remote-tracking branch 'upstream/master' into json-protocol d203d5c [qqsun8819] [SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself --- .../org/apache/spark/deploy/JsonProtocol.scala | 20 ++++- .../apache/spark/deploy/JsonProtocolSuite.scala | 97 ++++++++++++++++++++-- 2 files changed, 109 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index e607b8c6f4..33e69371b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy import net.liftweb.json.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo, DriverInfo} import org.apache.spark.deploy.worker.ExecutorRunner @@ -32,9 +32,12 @@ private[spark] object JsonProtocol { ("webuiaddress" -> obj.webUiAddress) ~ ("cores" -> obj.cores) ~ ("coresused" -> obj.coresUsed) ~ + ("coresfree" -> obj.coresFree) ~ ("memory" -> obj.memory) ~ ("memoryused" -> obj.memoryUsed) ~ - ("state" -> obj.state.toString) + ("memoryfree" -> obj.memoryFree) ~ + ("state" -> obj.state.toString) ~ + ("lastheartbeat" -> obj.lastHeartbeat) } def writeApplicationInfo(obj: ApplicationInfo) = { @@ -54,7 +57,9 @@ private[spark] object JsonProtocol { ("name" -> obj.name) ~ ("cores" -> obj.maxCores) ~ ("memoryperslave" -> obj.memoryPerSlave) ~ - ("user" -> obj.user) + ("user" -> obj.user) ~ + ("sparkhome" -> obj.sparkHome) ~ + ("command" -> obj.command.toString) } def writeExecutorRunner(obj: ExecutorRunner) = { @@ -64,6 +69,14 @@ private[spark] object JsonProtocol { ("appdesc" -> writeApplicationDescription(obj.appDesc)) } + def writeDriverInfo(obj: DriverInfo) = { + ("id" -> obj.id) ~ + ("starttime" -> obj.startTime.toString) ~ + ("state" -> obj.state.toString) ~ + ("cores" -> obj.desc.cores) ~ + ("memory" -> obj.desc.mem) + } + def writeMasterState(obj: MasterStateResponse) = { ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ @@ -73,6 +86,7 @@ private[spark] object JsonProtocol { ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ + ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ ("status" -> obj.status.toString) } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 693b1ab237..6445db0063 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -20,8 +20,9 @@ package org.apache.spark.deploy import java.io.File import java.util.Date +import net.liftweb.json.Diff import net.liftweb.json.{JsonAST, JsonParser} -import net.liftweb.json.JsonAST.JValue +import net.liftweb.json.JsonAST.{JNothing, JValue} import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} @@ -29,24 +30,35 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryStat import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner} class JsonProtocolSuite extends FunSuite { + test("writeApplicationInfo") { val output = JsonProtocol.writeApplicationInfo(createAppInfo()) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.appInfoJsonStr)) } test("writeWorkerInfo") { val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerInfoJsonStr)) } test("writeApplicationDescription") { val output = JsonProtocol.writeApplicationDescription(createAppDesc()) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.appDescJsonStr)) } test("writeExecutorRunner") { val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.executorRunnerJsonStr)) + } + + test("writeDriverInfo") { + val output = JsonProtocol.writeDriverInfo(createDriverInfo()) + assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.driverInfoJsonStr)) } test("writeMasterState") { @@ -59,6 +71,7 @@ class JsonProtocolSuite extends FunSuite { activeDrivers, completedDrivers, RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.masterStateJsonStr)) } test("writeWorkerState") { @@ -70,6 +83,7 @@ class JsonProtocolSuite extends FunSuite { finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") val output = JsonProtocol.writeWorkerState(stateResponse) assertValidJson(output) + assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerStateJsonStr)) } def createAppDesc(): ApplicationDescription = { @@ -78,8 +92,10 @@ class JsonProtocolSuite extends FunSuite { } def createAppInfo() : ApplicationInfo = { - new ApplicationInfo( - 3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue) + val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime, + "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue) + appInfo.endTime = JsonConstants.currTimeInMillis + appInfo } def createDriverCommand() = new Command( @@ -90,10 +106,13 @@ class JsonProtocolSuite extends FunSuite { def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand()) - def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date()) + def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", + createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis + workerInfo } def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", @@ -111,4 +130,72 @@ class JsonProtocolSuite extends FunSuite { case e: JsonParser.ParseException => fail("Invalid Json detected", e) } } + + def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) { + val Diff(c, a, d) = validateJson diff expectedJson + assert(c === JNothing, "Json changed") + assert(a === JNothing, "Json added") + assert(d === JNothing, "Json deleted") + } +} + +object JsonConstants { + val currTimeInMillis = System.currentTimeMillis() + val appInfoStartTime = 3 + val submitDate = new Date(123456789) + val appInfoJsonStr = + """ + |{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr", + |"cores":4,"user":"%s", + |"memoryperslave":1234,"submitdate":"%s", + |"state":"WAITING","duration":%d} + """.format(System.getProperty("user.name", ""), + submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin + + val workerInfoJsonStr = + """ + |{"id":"id","host":"host","port":8080, + |"webuiaddress":"http://publicAddress:80", + |"cores":4,"coresused":0,"coresfree":4, + |"memory":1234,"memoryused":0,"memoryfree":1234, + |"state":"ALIVE","lastheartbeat":%d} + """.format(currTimeInMillis).stripMargin + + val appDescJsonStr = + """ + |{"name":"name","cores":4,"memoryperslave":1234, + |"user":"%s","sparkhome":"sparkHome", + |"command":"Command(mainClass,List(arg1, arg2),Map())"} + """.format(System.getProperty("user.name", "")).stripMargin + + val executorRunnerJsonStr = + """ + |{"id":123,"memory":1234,"appid":"appId", + |"appdesc":%s} + """.format(appDescJsonStr).stripMargin + + val driverInfoJsonStr = + """ + |{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100} + """.stripMargin + + val masterStateJsonStr = + """ + |{"url":"spark://host:8080", + |"workers":[%s,%s], + |"cores":8,"coresused":0,"memory":2468,"memoryused":0, + |"activeapps":[%s],"completedapps":[], + |"activedrivers":[%s], + |"status":"ALIVE"} + """.format(workerInfoJsonStr, workerInfoJsonStr, + appInfoJsonStr, driverInfoJsonStr).stripMargin + + val workerStateJsonStr = + """ + |{"id":"workerId","masterurl":"masterUrl", + |"masterwebuiurl":"masterWebUiUrl", + |"cores":4,"coresused":4,"memory":1234,"memoryused":1234, + |"executors":[], + |"finishedexecutors":[%s,%s]} + """.format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin } -- cgit v1.2.3