diff options
-rwxr-xr-x | build | 9 | ||||
-rw-r--r-- | crashbox-server/build.sbt | 2 | ||||
-rw-r--r-- | crashbox-server/src/main/resources/reference.conf | 7 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Core.scala | 10 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala (renamed from crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala) | 36 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala | 55 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Main.scala | 13 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala | 85 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala | 3 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala | 62 | ||||
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala | 38 | ||||
-rw-r--r-- | crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala | 2 | ||||
-rw-r--r-- | project/Dependencies.scala | 3 |
13 files changed, 180 insertions, 145 deletions
@@ -3,13 +3,18 @@ case "$1" in start) shift - curl -i -X POST -H "Content-Type: application/json" "http://localhost:9000/api/submit" \ + curl -i -sS -X POST -H "Content-Type: application/json" "http://localhost:9000/api/submit" \ -d "{\"url\": \"$1\"}" ;; cancel) shift - curl -i -X POST "http://localhost:9000/api/$1/cancel" + curl -i -sS -X POST "http://localhost:9000/api/$1/cancel" ;; + logs) + shift + curl -i -sS -X GET "http://localhost:9000/api/$1/logs" + ;; + *) echo "invalid command $1" >&2 exit 2 diff --git a/crashbox-server/build.sbt b/crashbox-server/build.sbt index 3de6438..ed4179d 100644 --- a/crashbox-server/build.sbt +++ b/crashbox-server/build.sbt @@ -9,5 +9,7 @@ libraryDependencies ++= Seq( Dependencies.jgitArchive, Dependencies.jgitServer, Dependencies.dockerClient, + Dependencies.slick, + "com.h2database" % "h2" % "1.4.193", Dependencies.scalatest % Test ) diff --git a/crashbox-server/src/main/resources/reference.conf b/crashbox-server/src/main/resources/reference.conf index bfcd645..3ac6de7 100644 --- a/crashbox-server/src/main/resources/reference.conf +++ b/crashbox-server/src/main/resources/reference.conf @@ -15,6 +15,13 @@ crashbox { streams { directory = "streams" } + + db { + url = "jdbc:h2:mem:test1" + driver = org.h2.Driver + connectionPool = disabled + keepAliveConnection = true + } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala index 78a4252..39a1aec 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala @@ -4,7 +4,9 @@ import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration.Duration import akka.actor.ActorSystem +import akka.event.LoggingAdapter import akka.stream.ActorMaterializer +import com.typesafe.config.Config trait Core { @@ -14,13 +16,13 @@ trait Core { val blockingDispatcher: ExecutionContext = system.dispatchers.lookup("crashbox.blocking-dispatcher") - def log = system.log - def config = system.settings.config + def log: LoggingAdapter = system.log + def config: Config = system.settings.config sys.addShutdownHook { - log.info("Shutting down systm") + log.info("Shutting down core system") Await.ready(system.terminate(), Duration.Inf) - println("shutdown") + log.info("System stopped") } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala index 1532832..045bf89 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala @@ -16,7 +16,7 @@ import com.spotify.docker.client.exceptions.ContainerNotFoundException import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} import com.spotify.docker.client.messages.HostConfig.Bind -trait Builders { core: Core => +trait Executors { core: Core => val dockerClient = DefaultDockerClient.builder().uri("unix:///run/docker.sock").build() @@ -29,14 +29,16 @@ trait Builders { core: Core => def containerWorkDirectory = "/home/crashbox" def containerKillTimeout = 10.seconds - case class ContainerId(id: String) { - override def toString = id + case class ExecutionId(containerId: String) { + override def toString = containerId } - def startBuild(image: String, - script: String, - dir: File, - out: OutputStream): Future[ContainerId] = + def startExecution( + image: String, + script: String, + dir: File, + out: OutputStream + ): Future[ExecutionId] = Future { val volume = Bind .builder() @@ -77,23 +79,24 @@ trait Builders { core: Core => } } } - ContainerId(container) + ExecutionId(container) }(blockingDispatcher) - def waitBuild(id: ContainerId): Future[Int] = + def waitExecution(id: ExecutionId): Future[Int] = Future { log.debug(s"Waiting for container $id to exit") - val res: Int = dockerClient.waitContainer(id.id).statusCode() - cancelBuild(id) + val res: Int = dockerClient.waitContainer(id.containerId).statusCode() + cancelExecution(id) res }(blockingDispatcher) - def cancelBuild(id: ContainerId): Unit = { - log.debug(s"Stopping container $id") + def cancelExecution(id: ExecutionId): Unit = { try { - dockerClient.stopContainer(id.id, + log.debug(s"Stopping container $id") + dockerClient.stopContainer(id.containerId, containerKillTimeout.toUnit(SECONDS).toInt) - dockerClient.removeContainer(id.id) + log.debug(s"Removing container $id") + dockerClient.removeContainer(id.containerId) } catch { case _: ContainerNotFoundException => // build already cancelled } @@ -107,7 +110,8 @@ trait Builders { core: Core => ) .asScala stale.foreach { container => - dockerClient.removeContainer(container.id()) + log.warning(s"Removing stale container ${container.id}") + dockerClient.removeContainer(container.id) } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala index 71a4f5b..c9e62c3 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala @@ -1,7 +1,7 @@ package io.crashbox.ci import java.net.URL -import java.security.MessageDigest +import java.util.UUID import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.{Marshaller, ToResponseMarshaller} @@ -10,24 +10,27 @@ import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart import akka.http.scaladsl.model.HttpResponse import akka.http.scaladsl.server._ import akka.http.scaladsl.server.Directives._ -import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Source => Src} -import spray.json.DefaultJsonProtocol +import akka.stream.scaladsl.StreamConverters +import spray.json._ -trait HttpApi { self: Core with Schedulers with StreamStore => +trait HttpApi { self: Core with Schedulers with Storage => val endpoint = "api" - case class Request(url: String) { - def buildId: String = { - val bytes = MessageDigest.getInstance("SHA-256").digest(url.getBytes) - bytes.map { byte => - Integer.toString((byte & 0xff) + 0x100, 16) - }.mkString - } - } + case class Request(url: URL) {} object Protocol extends DefaultJsonProtocol { + val urlReader = new JsonReader[URL] { + override def read(js: JsValue) = js match { + case JsString(str) => new URL(str) + case _ => deserializationError("Expected valid url string") + } + } + val urlWriter = new JsonWriter[URL] { + override def write(url: URL) = JsString(url.toString()) + } + implicit val urlFormat: JsonFormat[URL] = jsonFormat(urlReader, urlWriter) implicit val request = jsonFormat1(Request) } import Protocol._ @@ -43,25 +46,25 @@ trait HttpApi { self: Core with Schedulers with StreamStore => path("submit") { post { entity(as[Request]) { req => - val source = Src - .queue[String](100, OverflowStrategy.fail) - .mapMaterializedValue { q => - q.offer(s"Build ID: ${req.buildId}") - start( - req.buildId, - new URL(req.url), - () => saveStream(req.buildId), - state => q.offer(state.toString) - ) - } - complete(source) + val scheduled = scheduleBuild(req.url).map(_.toString()) + complete(scheduled) } } } ~ path(Segment / "cancel") { buildId => post { - cancel(buildId) - complete(204 -> s"Cancelled $buildId") + cancelBuild(UUID.fromString(buildId)) + complete(204 -> None) + } + } ~ + path(Segment / "logs") { buildId => + get { + val src = StreamConverters + .fromInputStream(() => readLog(UUID.fromString(buildId), 0)) + .map { bs => + bs.utf8String + } + complete(src) } } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala index 4961b86..74a5527 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala @@ -7,23 +7,22 @@ import akka.http.scaladsl.Http object Main extends Core with Schedulers - with Builders + with Storage + with Executors with Parsers with Source - with StreamStore with HttpApi { def main(args: Array[String]): Unit = { reapDeadBuilds() - val host = system.settings.config.getString("crashbox.host") - val port = system.settings.config.getInt("crashbox.port") - + val host = config.getString("crashbox.host") + val port = config.getInt("crashbox.port") Http(system).bindAndHandle(httpApi, host, port) onComplete { case Success(_) => - system.log.info(s"Listening on $host:$port") + log.info(s"Listening on $host:$port") case Failure(ex) => - system.log.error(ex, s"Failed to bind to $host:$port") + log.error(ex, s"Failed to bind to $host:$port") system.terminate() } } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala index 3600fc1..cb4e793 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala @@ -5,11 +5,15 @@ import java.net.URL import java.nio.file.Files import scala.collection.mutable.HashMap +import scala.concurrent.Future +import scala.concurrent.duration._ import scala.util.{Failure, Success} import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} +import akka.util.Timeout -trait Schedulers extends { self: Core with Source with Builders with Parsers => +trait Schedulers { + self: Core with Source with Executors with Parsers with Storage => private def newTempDir: File = Files.createTempDirectory("crashbox-run").toFile() @@ -18,25 +22,24 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case class Cloning(url: URL) extends BuildState case class Parsing(dir: File) extends BuildState case class Starting(dir: File, buildDef: BuildDef) extends BuildState - case class Running(id: ContainerId) extends BuildState + case class Running(id: ExecutionId) extends BuildState sealed trait EndBuildState extends BuildState case class Finished(status: Int) extends EndBuildState case class Failed(message: String) extends EndBuildState class BuildManager( - url: URL, - openOut: () => OutputStream, - update: BuildState => Unit + buildId: BuildId, + url: URL ) extends Actor with ActorLogging { var buildDir: Option[File] = None var out: Option[OutputStream] = None - var containerId: Option[ContainerId] = None + var containerId: Option[ExecutionId] = None override def postStop() = { - containerId foreach { cancelBuild(_) } + containerId foreach { cancelExecution(_) } out foreach { _.close() } buildDir foreach { _.delete() } log.info(s"Stopped build of $url") @@ -51,7 +54,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case state @ Cloning(url) => log.debug("Update build state: cloning") - update(state) + updateBuildState(buildId, state) fetchSource(url, newTempDir) onComplete { case Success(dir) => self ! Parsing(dir) @@ -61,7 +64,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case state @ Parsing(src) => log.debug("Update build state: parsing") - update(state) + updateBuildState(buildId, state) buildDir = Some(src) parseBuild(src) match { case Left(buildDef) => @@ -72,10 +75,10 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case state @ Starting(src, bd) => log.debug("Update build state: starting") - update(state) - val so = openOut() + updateBuildState(buildId, state) + val so = saveLog(buildId, 0) out = Some(so) - startBuild(bd.image, bd.script, src, so) onComplete { + startExecution(bd.image, bd.script, src, so) onComplete { case Success(id) => self ! Running(id) case Failure(err) => @@ -84,9 +87,9 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case state @ Running(id) => log.debug("Update build state: running") - update(state) + updateBuildState(buildId, state) containerId = Some(id) - waitBuild(id) onComplete { + waitExecution(id) onComplete { case Success(status) => self ! Finished(status) case Failure(err) => @@ -95,48 +98,36 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => case state @ Finished(status) => log.debug("Update build state: finished") - update(state) + updateBuildState(buildId, state) context stop self case state @ Failed(message) => log.debug("Update build state: failed") - update(state) + updateBuildState(buildId, state) context stop self } } object BuildManager { - def apply(buildId: String, - url: URL, - out: () => OutputStream, - update: BuildState => Unit) = - Props(new BuildManager(url, out, update)) + def apply(buildId: BuildId, url: URL) = + Props(new BuildManager(buildId, url)) } private sealed trait SchedulerCommand - private case class ScheduleBuild( - buildId: String, - url: URL, - out: () => OutputStream, - update: BuildState => Unit - ) extends SchedulerCommand - private case class CancelBuild(buildId: String) extends SchedulerCommand + private case class ScheduleBuild(url: URL) extends SchedulerCommand + private case class CancelBuild(buildId: BuildId) extends SchedulerCommand class Scheduler extends Actor { - - val runningBuilds = new HashMap[String, ActorRef] + val runningBuilds = new HashMap[BuildId, ActorRef] override def receive = { - case sb: ScheduleBuild => - runningBuilds.get(sb.buildId) match { - case Some(_) => //already running - case None => - val buildManager = context.actorOf( - BuildManager(sb.buildId, sb.url, sb.out, sb.update), - s"build-${sb.buildId}") - context watch buildManager - runningBuilds += sb.buildId -> buildManager - } + case ScheduleBuild(url) => + val buildId = newBuildId() + val buildManager = + context.actorOf(BuildManager(buildId, url), s"build-${buildId}") + context watch buildManager + runningBuilds += buildId -> buildManager + sender ! buildId case CancelBuild(id) => runningBuilds.get(id).foreach { builder => @@ -154,16 +145,14 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers => private val scheduler = system.actorOf(Props(new Scheduler()), "crashbox-scheduler") - def start( - buildId: String, - url: URL, - out: () => OutputStream, - update: BuildState => Unit - ): Unit = { - scheduler ! ScheduleBuild(buildId, url, out, update) + // None if build can not be scheduled (queue is full) + def scheduleBuild(url: URL): Future[BuildId] = { + import akka.pattern.ask + implicit val timeout: Timeout = Timeout(5.seconds) + (scheduler ? ScheduleBuild(url)).mapTo[BuildId] } - def cancel(buildId: String): Unit = { + def cancelBuild(buildId: BuildId): Unit = { scheduler ! CancelBuild(buildId) } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala deleted file mode 100644 index 3474407..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala +++ /dev/null @@ -1,3 +0,0 @@ -package io.crashbox.ci - -trait StateStore {} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala new file mode 100644 index 0000000..1035940 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala @@ -0,0 +1,62 @@ +package io.crashbox.ci + +import java.io.{ + BufferedInputStream, + File, + FileInputStream, + FileOutputStream, + InputStream, + OutputStream +} +import java.util.UUID + +import slick.driver.H2Driver.api._ + +trait Storage { self: Core with Parsers with Schedulers => + + type BuildId = UUID + + + + def newBuildId() = UUID.randomUUID() + + + + private val streamsDirectory: File = new File( + config.getString("crashbox.streams.directory")) + + private def logFile(buildId: BuildId, task: Int): File = { + def stringifyId(id: BuildId): String = { + val bytes = new Array[Byte](16) // 128 bits + for (i <- 0 until 8) { + bytes(i) = ((id.getLeastSignificantBits >> i) & 0xff).toByte + } + for (i <- 0 until 8) { + bytes(8 + i) = ((id.getMostSignificantBits >> i) & 0xff).toByte + } + bytes.map { byte => + Integer.toString((byte & 0xff) + 0x100, 16) + }.mkString + } + val (dir1, tail) = stringifyId(buildId).splitAt(2) + val (dir2, dir3) = tail.splitAt(2) + new File(streamsDirectory, s"$dir1/$dir2/$dir3/$task") + } + + def saveLog(buildId: BuildId, task: Int): OutputStream = { + val file = logFile(buildId, task) + file.getParentFile.mkdirs() + file.createNewFile() + file.setWritable(true) + new FileOutputStream(file) + } + + def readLog(buildId: BuildId, task: Int): InputStream = { + new FileInputStream(logFile(buildId, task)) + } + + def updateBuildState(buildId: BuildId, state: BuildState) = { + log.info(s"Build $buildId: state update $state") + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala deleted file mode 100644 index bba3cdf..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala +++ /dev/null @@ -1,38 +0,0 @@ -package io.crashbox.ci - -import java.io.{ - File, - FileInputStream, - FileOutputStream, - InputStream, - OutputStream -} -import java.security.MessageDigest - -trait StreamStore { self: Core => - - val streamsDirectory: File = new File( - config.getString("crashbox.streams.directory")) - - private def logFile(id: String): File = { - val bytes = MessageDigest.getInstance("SHA-256").digest(id.getBytes) - val str = bytes.map { byte => - Integer.toString((byte & 0xff) + 0x100, 16) - }.mkString - val (head, tail) = str.splitAt(2) - new File(streamsDirectory, s"$head/$tail") - } - - def saveStream(id: String): OutputStream = { - val file = logFile(id) - file.getParentFile.mkdirs() - file.createNewFile() - file.setWritable(true) - new FileOutputStream(file) - } - - def readStream(id: String): InputStream = { - new FileInputStream(logFile(id)) - } - -} diff --git a/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala index a7e7ae6..2c68d87 100644 --- a/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala +++ b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala @@ -1,6 +1,6 @@ package io.crashbox.ci -import java.io.{File, OutputStream} +import java.io.File import java.nio.file.Files object TestUtil { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2d768d7..6038711 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,6 +15,9 @@ object Dependencies { val dockerClient = "com.spotify" % "docker-client" % "8.1.1" + val slick = "com.typesafe.slick" %% "slick" % "3.2.0" + //"com.typesafe.slick" %% "slick-hikaricp" % "3.2.0" + val scalatest = "org.scalatest" %% "scalatest" % "3.0.1" } |