From db1c748be84c29bc483195439a21e2b9d44da63b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 6 Mar 2017 03:30:41 -0800 Subject: Basic http api --- crashbox-server/src/main/resources/reference.conf | 3 + .../src/main/scala/io/crashbox/ci/Core.scala | 2 + .../src/main/scala/io/crashbox/ci/HttpApi.scala | 76 ++++++++++++++++++++++ .../src/main/scala/io/crashbox/ci/Main.scala | 24 ++++--- .../src/main/scala/io/crashbox/ci/Schedulers.scala | 3 +- 5 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala (limited to 'crashbox-server/src/main') diff --git a/crashbox-server/src/main/resources/reference.conf b/crashbox-server/src/main/resources/reference.conf index a156b88..bfcd645 100644 --- a/crashbox-server/src/main/resources/reference.conf +++ b/crashbox-server/src/main/resources/reference.conf @@ -1,5 +1,8 @@ crashbox { + host = "[::]" + port = 9000 + blocking-dispatcher { type = Dispatcher executor = "thread-pool-executor" diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala index 5a30df0..ed9ff14 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala @@ -1,12 +1,14 @@ package io.crashbox.ci import akka.actor.ActorSystem +import akka.stream.ActorMaterializer import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} trait Core { implicit val system: ActorSystem = ActorSystem("crashbox") + implicit val materializer = ActorMaterializer() implicit val executionContext: ExecutionContext = system.dispatcher val blockingDispatcher: ExecutionContext = system.dispatchers.lookup("crashbox.blocking-dispatcher") diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala new file mode 100644 index 0000000..22feba5 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala @@ -0,0 +1,76 @@ +package io.crashbox.ci + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshalling.Marshaller +import akka.http.scaladsl.marshalling.ToResponseMarshaller +import akka.http.scaladsl.model.{ + ContentType, + ContentTypes, + HttpEntity, + MediaTypes +} +import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.ws.TextMessage +import akka.http.scaladsl.server.directives.Credentials +import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.stream.{ActorMaterializer, OverflowStrategy} +import akka.stream.scaladsl.{Source => Src} +import java.net.URL +import java.security.MessageDigest +import java.text.SimpleDateFormat +import java.util.Date +import scala.util.{Failure, Success} +import akka.http.scaladsl.server._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import Directives._ +import SprayJsonSupport._ +import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue} + +trait HttpApi { self: Core with Schedulers with StreamStore => + + val endpoint = "api" + + case class Request(url: String) { + def buildId: String = { + val bytes = MessageDigest.getInstance("SHA-256").digest(url.getBytes) + bytes.map { byte => + Integer.toString((byte & 0xff) + 0x100, 16) + }.mkString + } + } + + object Protocol extends DefaultJsonProtocol { + implicit val request = jsonFormat1(Request) + } + import Protocol._ + + implicit val toResponseMarshaller: ToResponseMarshaller[Src[BuildState, Any]] = + Marshaller.opaque { items => + val data = items.map(item => ChunkStreamPart(item.toString + "\n")) + HttpResponse( + entity = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, data)) + } + + def httpApi: Route = pathPrefix(endpoint) { + path("submit") { + entity(as[Request]) { req => + val source = Src + .queue[BuildState](100, OverflowStrategy.fail) + .mapMaterializedValue { q => + start( + req.buildId, + new URL(req.url), + () => saveStream(req.buildId), + state => q.offer(state) + ) + } + + complete(source) + } + + } + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala index 0187751..d2e886b 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala @@ -1,6 +1,9 @@ package io.crashbox.ci +import akka.http.scaladsl.Http +import akka.stream.ActorMaterializer import java.net.URL +import scala.util.{Failure, Success} object Main extends Core @@ -8,19 +11,22 @@ object Main with Builders with Parsers with Source - with StreamStore { + with StreamStore + with HttpApi { def main(args: Array[String]): Unit = { reapDeadBuilds() - start( - "random_build", - new URL("file:///home/jodersky/tmp/dummy"), - () => saveStream("random_build"), - state => println(state) - ) - Thread.sleep(15000) - System.exit(0) + val host = system.settings.config.getString("crashbox.host") + val port = system.settings.config.getInt("crashbox.port") + + Http(system).bindAndHandle(httpApi, host, port) onComplete { + case Success(_) => + system.log.info(s"Listening on $host:$port") + case Failure(ex) => + system.log.error(ex, s"Failed to bind to $host:$port") + system.terminate() + } } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala index e9f2a82..01c278d 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala @@ -52,10 +52,11 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => containerId foreach { cancelBuild(_) } out foreach { _.close() } buildDir foreach { _.delete() } + log.info(s"Stopped build of $url") } override def preStart() = { - log.info(s"Started build manager for $url") + log.info(s"Started build of $url") self ! Cloning(url) } -- cgit v1.2.3