aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-18 15:13:12 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-22 10:31:36 -0700
commit91ec5a1a04339983d57a72d8df8f1d769d8d855a (patch)
treecab5e5b370b4a107b883ecfe9b5698ce7071abfb
parentfc94576ece99f2b224a951b32f0f6360701b7cd3 (diff)
downloadspark-91ec5a1a04339983d57a72d8df8f1d769d8d855a.tar.gz
spark-91ec5a1a04339983d57a72d8df8f1d769d8d855a.tar.bz2
spark-91ec5a1a04339983d57a72d8df8f1d769d8d855a.zip
Changing JSON protocol and removing spray code
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala114
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala11
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala15
-rw-r--r--core/src/main/scala/spark/util/WebUI.scala12
-rw-r--r--project/SparkBuild.scala7
-rw-r--r--project/plugins.sbt2
8 files changed, 66 insertions, 107 deletions
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index ea832101d2..b4365d31e9 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -1,79 +1,65 @@
package spark.deploy
import master.{ApplicationInfo, WorkerInfo}
+import net.liftweb.json.JsonDSL._
import worker.ExecutorRunner
-import cc.spray.json._
-/**
- * spray-json helper class containing implicit conversion to json for marshalling responses
- */
-private[spark] object JsonProtocol extends DefaultJsonProtocol {
- implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
- def write(obj: WorkerInfo) = JsObject(
- "id" -> JsString(obj.id),
- "host" -> JsString(obj.host),
- "port" -> JsNumber(obj.port),
- "webuiaddress" -> JsString(obj.webUiAddress),
- "cores" -> JsNumber(obj.cores),
- "coresused" -> JsNumber(obj.coresUsed),
- "memory" -> JsNumber(obj.memory),
- "memoryused" -> JsNumber(obj.memoryUsed)
- )
- }
+object JsonProtocol {
+ def writeWorkerInfo(obj: WorkerInfo) = {
+ ("id" -> obj.id) ~
+ ("host" -> obj.host) ~
+ ("port" -> obj.port) ~
+ ("webuiaddress" -> obj.webUiAddress) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed)
+ }
- implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] {
- def write(obj: ApplicationInfo) = JsObject(
- "starttime" -> JsNumber(obj.startTime),
- "id" -> JsString(obj.id),
- "name" -> JsString(obj.desc.name),
- "cores" -> JsNumber(obj.desc.maxCores),
- "user" -> JsString(obj.desc.user),
- "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
- "submitdate" -> JsString(obj.submitDate.toString))
+ def writeApplicationInfo(obj: ApplicationInfo) = {
+ ("starttime" -> obj.startTime) ~
+ ("id" -> obj.id) ~
+ ("name" -> obj.desc.name) ~
+ ("cores" -> obj.desc.maxCores) ~
+ ("user" -> obj.desc.user) ~
+ ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+ ("submitdate" -> obj.submitDate.toString)
}
- implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] {
- def write(obj: ApplicationDescription) = JsObject(
- "name" -> JsString(obj.name),
- "cores" -> JsNumber(obj.maxCores),
- "memoryperslave" -> JsNumber(obj.memoryPerSlave),
- "user" -> JsString(obj.user)
- )
+ def writeApplicationDescription(obj: ApplicationDescription) = {
+ ("name" -> obj.name) ~
+ ("cores" -> obj.maxCores) ~
+ ("memoryperslave" -> obj.memoryPerSlave) ~
+ ("user" -> obj.user)
}
- implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] {
- def write(obj: ExecutorRunner) = JsObject(
- "id" -> JsNumber(obj.execId),
- "memory" -> JsNumber(obj.memory),
- "appid" -> JsString(obj.appId),
- "appdesc" -> obj.appDesc.toJson.asJsObject
- )
+ def writeExecutorRunner(obj: ExecutorRunner) = {
+ ("id" -> obj.execId) ~
+ ("memory" -> obj.memory) ~
+ ("appid" -> obj.appId) ~
+ ("appdesc" -> writeApplicationDescription(obj.appDesc))
}
- implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
- def write(obj: MasterState) = JsObject(
- "url" -> JsString("spark://" + obj.uri),
- "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
- "cores" -> JsNumber(obj.workers.map(_.cores).sum),
- "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
- "memory" -> JsNumber(obj.workers.map(_.memory).sum),
- "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
- "activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)),
- "completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson))
- )
+ def writeMasterState(obj: MasterState) = {
+ ("url" -> ("spark://" + obj.uri)) ~
+ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
+ ("cores" -> obj.workers.map(_.cores).sum) ~
+ ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
+ ("memory" -> obj.workers.map(_.memory).sum) ~
+ ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
+ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
- implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
- def write(obj: WorkerState) = JsObject(
- "id" -> JsString(obj.workerId),
- "masterurl" -> JsString(obj.masterUrl),
- "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
- "cores" -> JsNumber(obj.cores),
- "coresused" -> JsNumber(obj.coresUsed),
- "memory" -> JsNumber(obj.memory),
- "memoryused" -> JsNumber(obj.memoryUsed),
- "executors" -> JsArray(obj.executors.toList.map(_.toJson)),
- "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson))
- )
+ def writeWorkerState(obj: WorkerState) = {
+ ("id" -> obj.workerId) ~
+ ("masterurl" -> obj.masterUrl) ~
+ ("masterwebuiurl" -> obj.masterWebUiUrl) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed) ~
+ ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
+ ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
}
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index a2e9dfd762..6623142d69 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -55,9 +55,9 @@ class MasterWebUI(master: ActorRef) extends Logging {
<div class="row">
<div class="span12">
<ul class="unstyled">
- <li><strong>ID:</strong> app.id</li>
- <li><strong>Description:</strong> app.desc.name</li>
- <li><strong>User:</strong> app.desc.user</li>
+ <li><strong>ID:</strong> {app.id}</li>
+ <li><strong>Description:</strong> {app.desc.name}</li>
+ <li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong>
{
if (app.desc.maxCores == Integer.MAX_VALUE) {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index b8b4b89738..0af9eb8efa 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -1,16 +1,12 @@
package spark.deploy.worker
-import akka.actor.{ActorRef, ActorSystem}
+import akka.actor.ActorRef
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.{Duration, Timeout}
import akka.util.duration._
-import cc.spray.Directives
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.http.MediaTypes
import spark.deploy.{WorkerState, RequestWorkerState}
-import spark.deploy.JsonProtocol._
import java.io.File
import spark.util.{WebUI => UtilsWebUI}
import spark.{Utils, Logging}
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 6ac4398de7..e9c362fce7 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -3,18 +3,12 @@ package spark.storage
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Duration
import akka.util.duration._
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.Directives
import spark.{Logging, SparkContext}
-import spark.util.AkkaUtils
import spark.Utils
import spark.util.WebUI
-import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, ResourceHandler}
import org.eclipse.jetty.server.Handler
-import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
-import xml.Elem
+import javax.servlet.http.HttpServletRequest
import xml.Node
-import java.net.URLClassLoader
import spark.util.WebUI._
@@ -23,7 +17,7 @@ import spark.util.WebUI._
*/
private[spark]
class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
- extends Directives with Logging {
+ extends Logging {
implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
@@ -55,7 +49,6 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
- spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
val content =
<div class="row">
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index bd2d637ae7..134c912c46 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,21 +1,10 @@
package spark.util
-import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.util.duration._
-import akka.pattern.ask
import akka.remote.RemoteActorRefProvider
-import cc.spray.Route
-import cc.spray.io.IoWorker
-import cc.spray.{SprayCanRootService, HttpService}
-import cc.spray.can.server.HttpServer
-import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
-import akka.dispatch.Await
-import spark.{Utils, SparkException}
-import java.util.concurrent.TimeoutException
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.Handler
-import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler}
+
/**
* Various utility classes for working with Akka.
diff --git a/core/src/main/scala/spark/util/WebUI.scala b/core/src/main/scala/spark/util/WebUI.scala
index 34b776f1d8..e6b39b15eb 100644
--- a/core/src/main/scala/spark/util/WebUI.scala
+++ b/core/src/main/scala/spark/util/WebUI.scala
@@ -1,23 +1,22 @@
package spark.util
-import xml.Elem
import xml.Node
-import util.parsing.json.{JSONFormat, JSONObject}
import org.eclipse.jetty.server.{Server, Request, Handler}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
-import org.eclipse.jetty.util.component.LifeCycle.Listener
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
import util.Try
import util.Success
import util.Failure
import spark.Logging
import annotation.tailrec
+import net.liftweb.json.JsonAST.JValue
+import net.liftweb.json._
object WebUI extends Logging {
type Responder[T] = HttpServletRequest => T
- implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler =
- createHandler(responder, "text/json")
+ implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
+ createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html")
@@ -25,7 +24,8 @@ object WebUI extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
- def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = {
+ def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+ extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler {
def handle(target: String,
baseRequest: Request,
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index faf6e2ae8e..ec26b2a229 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -4,7 +4,6 @@ import sbt.Classpaths.publishTask
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
-import twirl.sbt.TwirlPlugin._
// For Sonatype publishing
//import com.jsuereth.pgp.sbtplugin.PgpKeys._
@@ -157,9 +156,7 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
- "cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty),
- "cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty),
- "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
+ "net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test"
@@ -189,7 +186,7 @@ object SparkBuild extends Build {
"src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
} )
}
- ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
+ ) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq(
publish := {}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index f806e66481..1b0f879b94 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -10,8 +10,6 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
-addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
-
// For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)