From dff97b40df39e5eb391e1fc30ca47180384f1747 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sat, 18 Mar 2017 01:58:01 -0400 Subject: Refactor trait-based dependencies to constructor parameters --- .../src/main/scala/io/crashbox/ci/BuildDef.scala | 8 + .../src/main/scala/io/crashbox/ci/Core.scala | 2 +- .../main/scala/io/crashbox/ci/DockerExecutor.scala | 119 ++++++++++++++ .../src/main/scala/io/crashbox/ci/Executors.scala | 118 -------------- .../src/main/scala/io/crashbox/ci/Git.scala | 21 +++ .../src/main/scala/io/crashbox/ci/HttpApi.scala | 9 +- .../src/main/scala/io/crashbox/ci/Main.scala | 24 +-- .../src/main/scala/io/crashbox/ci/Parser.scala | 34 ++++ .../src/main/scala/io/crashbox/ci/Parsers.scala | 41 ----- .../src/main/scala/io/crashbox/ci/Scheduler.scala | 172 +++++++++++++++++++++ .../src/main/scala/io/crashbox/ci/Schedulers.scala | 162 ------------------- .../src/main/scala/io/crashbox/ci/Source.scala | 19 --- .../src/main/scala/io/crashbox/ci/Storage.scala | 6 +- .../src/main/scala/io/crashbox/ci/package.scala | 9 ++ 14 files changed, 385 insertions(+), 359 deletions(-) create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/BuildDef.scala create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/DockerExecutor.scala delete mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Git.scala create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Parser.scala delete mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala delete mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala delete mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/Source.scala create mode 100644 crashbox-server/src/main/scala/io/crashbox/ci/package.scala diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/BuildDef.scala b/crashbox-server/src/main/scala/io/crashbox/ci/BuildDef.scala new file mode 100644 index 0000000..898c222 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/BuildDef.scala @@ -0,0 +1,8 @@ +package io.crashbox.ci + +case class BuildDef( + image: String, + script: String +) + +case class ParseError(message: String) diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala index 39a1aec..cb1db5c 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala @@ -8,7 +8,7 @@ import akka.event.LoggingAdapter import akka.stream.ActorMaterializer import com.typesafe.config.Config -trait Core { +class Core { implicit val system: ActorSystem = ActorSystem("crashbox") implicit val materializer = ActorMaterializer() diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashbox-server/src/main/scala/io/crashbox/ci/DockerExecutor.scala new file mode 100644 index 0000000..b5405c7 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/DockerExecutor.scala @@ -0,0 +1,119 @@ +package io.crashbox.ci + +import java.io.{File, OutputStream} + +import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.concurrent.duration._ + +import com.spotify.docker.client.DefaultDockerClient +import com.spotify.docker.client.DockerClient.{ + AttachParameter, + ListContainersParam +} +import com.spotify.docker.client.LogStream +import com.spotify.docker.client.exceptions.ContainerNotFoundException +import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} +import com.spotify.docker.client.messages.HostConfig.Bind + +case class ExecutionId(containerId: String) { + override def toString = containerId +} + +class DockerExecutor(implicit core: Core) { + import core._ + + val dockerClient = + DefaultDockerClient.builder().uri("unix:///run/docker.sock").build() + + core.system.registerOnTermination { + dockerClient.close() + } + + def containerUser = "crashbox" + def containerWorkDirectory = "/home/crashbox" + def containerKillTimeout = 10.seconds + + def startExecution( + image: String, + script: String, + dir: File, + out: OutputStream + ): Future[ExecutionId] = + Future { + val volume = Bind + .builder() + .from(dir.getAbsolutePath) + .to(containerWorkDirectory) + .build() + val hostConfig = HostConfig.builder().binds(volume).build() + val containerConfig = ContainerConfig + .builder() + .labels(Map("crashbox" -> "build").asJava) + .hostConfig(hostConfig) + .tty(true) // combine stdout and stderr into stdout + .image(image) + .user(containerUser) + .workingDir(containerWorkDirectory) + .entrypoint("/bin/sh", "-c") + .cmd(script) + .build() + val container = dockerClient.createContainer(containerConfig).id + + log.debug(s"Starting container $container") + dockerClient.startContainer(container) + + log.debug(s"Attaching log stream of container $container") + blockingDispatcher execute new Runnable { + override def run() = { + var stream: LogStream = null + try { + stream = dockerClient.attachContainer( + container, + AttachParameter.LOGS, + AttachParameter.STDOUT, + AttachParameter.STREAM + ) + stream.attach(out, null, true) + } finally { + if (stream != null) stream.close() + } + } + } + ExecutionId(container) + }(blockingDispatcher) + + def waitExecution(id: ExecutionId): Future[Int] = + Future { + log.debug(s"Waiting for container $id to exit") + val res: Int = dockerClient.waitContainer(id.containerId).statusCode() + cancelExecution(id) + res + }(blockingDispatcher) + + def cancelExecution(id: ExecutionId): Unit = { + try { + log.debug(s"Stopping container $id") + dockerClient.stopContainer(id.containerId, + containerKillTimeout.toUnit(SECONDS).toInt) + log.debug(s"Removing container $id") + dockerClient.removeContainer(id.containerId) + } catch { + case _: ContainerNotFoundException => // build already cancelled + } + } + + def reapDeadBuilds(): Unit = { + val stale = dockerClient + .listContainers( + ListContainersParam.withLabel("crashbox"), + ListContainersParam.withStatusExited() + ) + .asScala + stale.foreach { container => + log.warning(s"Removing stale container ${container.id}") + dockerClient.removeContainer(container.id) + } + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala deleted file mode 100644 index 045bf89..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala +++ /dev/null @@ -1,118 +0,0 @@ -package io.crashbox.ci - -import java.io.{File, OutputStream} - -import scala.collection.JavaConverters._ -import scala.concurrent.Future -import scala.concurrent.duration._ - -import com.spotify.docker.client.DefaultDockerClient -import com.spotify.docker.client.DockerClient.{ - AttachParameter, - ListContainersParam -} -import com.spotify.docker.client.LogStream -import com.spotify.docker.client.exceptions.ContainerNotFoundException -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} -import com.spotify.docker.client.messages.HostConfig.Bind - -trait Executors { core: Core => - - val dockerClient = - DefaultDockerClient.builder().uri("unix:///run/docker.sock").build() - - core.system.registerOnTermination { - dockerClient.close() - } - - def containerUser = "crashbox" - def containerWorkDirectory = "/home/crashbox" - def containerKillTimeout = 10.seconds - - case class ExecutionId(containerId: String) { - override def toString = containerId - } - - def startExecution( - image: String, - script: String, - dir: File, - out: OutputStream - ): Future[ExecutionId] = - Future { - val volume = Bind - .builder() - .from(dir.getAbsolutePath) - .to(containerWorkDirectory) - .build() - val hostConfig = HostConfig.builder().binds(volume).build() - val containerConfig = ContainerConfig - .builder() - .labels(Map("crashbox" -> "build").asJava) - .hostConfig(hostConfig) - .tty(true) // combine stdout and stderr into stdout - .image(image) - .user(containerUser) - .workingDir(containerWorkDirectory) - .entrypoint("/bin/sh", "-c") - .cmd(script) - .build() - val container = dockerClient.createContainer(containerConfig).id - - log.debug(s"Starting container $container") - dockerClient.startContainer(container) - - log.debug(s"Attaching log stream of container $container") - blockingDispatcher execute new Runnable { - override def run() = { - var stream: LogStream = null - try { - stream = dockerClient.attachContainer( - container, - AttachParameter.LOGS, - AttachParameter.STDOUT, - AttachParameter.STREAM - ) - stream.attach(out, null, true) - } finally { - if (stream != null) stream.close() - } - } - } - ExecutionId(container) - }(blockingDispatcher) - - def waitExecution(id: ExecutionId): Future[Int] = - Future { - log.debug(s"Waiting for container $id to exit") - val res: Int = dockerClient.waitContainer(id.containerId).statusCode() - cancelExecution(id) - res - }(blockingDispatcher) - - def cancelExecution(id: ExecutionId): Unit = { - try { - log.debug(s"Stopping container $id") - dockerClient.stopContainer(id.containerId, - containerKillTimeout.toUnit(SECONDS).toInt) - log.debug(s"Removing container $id") - dockerClient.removeContainer(id.containerId) - } catch { - case _: ContainerNotFoundException => // build already cancelled - } - } - - def reapDeadBuilds(): Unit = { - val stale = dockerClient - .listContainers( - ListContainersParam.withLabel("crashbox"), - ListContainersParam.withStatusExited() - ) - .asScala - stale.foreach { container => - log.warning(s"Removing stale container ${container.id}") - dockerClient.removeContainer(container.id) - } - } - -} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Git.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Git.scala new file mode 100644 index 0000000..db39891 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Git.scala @@ -0,0 +1,21 @@ +package io.crashbox.ci + +import java.io.File +import java.net.URL + +import scala.concurrent.Future + +import org.eclipse.jgit.api.{Git => JGit} + +object Git { + + def fetchSource(from: URL, to: File)(implicit core: Core): Future[File] = { + import core._ + Future { + log.debug(s"Cloning git repo from $from to $to") + JGit.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call() + to + }(blockingDispatcher) + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala index c9e62c3..cbdbbf1 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala @@ -14,7 +14,8 @@ import akka.stream.scaladsl.{Source => Src} import akka.stream.scaladsl.StreamConverters import spray.json._ -trait HttpApi { self: Core with Schedulers with Storage => +class HttpApi(scheduler: Scheduler, storage: Storage)(implicit core: Core) { + import core._ val endpoint = "api" @@ -46,21 +47,21 @@ trait HttpApi { self: Core with Schedulers with Storage => path("submit") { post { entity(as[Request]) { req => - val scheduled = scheduleBuild(req.url).map(_.toString()) + val scheduled = scheduler.scheduleBuild(req.url).map(_.toString()) complete(scheduled) } } } ~ path(Segment / "cancel") { buildId => post { - cancelBuild(UUID.fromString(buildId)) + scheduler.cancelBuild(UUID.fromString(buildId)) complete(204 -> None) } } ~ path(Segment / "logs") { buildId => get { val src = StreamConverters - .fromInputStream(() => readLog(UUID.fromString(buildId), 0)) + .fromInputStream(() => storage.readLog(UUID.fromString(buildId), 0)) .map { bs => bs.utf8String } diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala index 55971b1..ae9499d 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala @@ -6,22 +6,24 @@ import akka.http.scaladsl.Http import scala.concurrent._ import scala.concurrent.duration._ -object Main - extends Core - with Schedulers - with Storage - with Executors - with Parsers - with Source - with HttpApi { +object Main { + + implicit val core = new Core() + import core._ + + val storage = new Storage() + val executor = new DockerExecutor + val scheduler = new Scheduler(executor, storage) + val api = new HttpApi(scheduler, storage) + def main(args: Array[String]): Unit = { - reapDeadBuilds() - Await.result(setupDatabase(), 10.seconds) + executor.reapDeadBuilds() + Await.result(storage.setupDatabase(), 10.seconds) val host = config.getString("crashbox.host") val port = config.getInt("crashbox.port") - Http(system).bindAndHandle(httpApi, host, port) onComplete { + Http(system).bindAndHandle(api.httpApi, host, port) onComplete { case Success(_) => log.info(s"Listening on $host:$port") case Failure(ex) => diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Parser.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Parser.scala new file mode 100644 index 0000000..a8b4f19 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Parser.scala @@ -0,0 +1,34 @@ +package io.crashbox.ci + +import java.io.File +import java.nio.file.Files + +import scala.collection.JavaConverters._ + +object Parser { + + def defaultImage = "crashbox/default" + + def parseBuild(workdir: File): Either[BuildDef, ParseError] = { + val file = new File(workdir, ".crashbox.txt") + if (!file.exists()) { + return Right( + ParseError("No build configuration file .crashbox.txt found.")) + } + + val lines = Files.readAllLines(file.toPath).asScala.map(_.trim) + + val Pattern = """(\w+)\s*:\s*(.+)""".r + + val image = lines + .collectFirst { case Pattern("image", s) => s } + .getOrElse(defaultImage) + val script = lines.collectFirst { case Pattern("script", s) => s } + + script match { + case Some(s) => Left(BuildDef(image, s)) + case None => + Right(ParseError("No build script defined in configuration.")) + } + } +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala deleted file mode 100644 index 020521c..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala +++ /dev/null @@ -1,41 +0,0 @@ -package io.crashbox.ci - -import java.io.File -import java.nio.file.Files - -import scala.collection.JavaConverters._ - -trait Parsers { - - def defaultImage = "crashbox/default" - - case class BuildDef( - image: String, - script: String - ) - - case class ParseError(message: String) - - def parseBuild(workdir: File): Either[BuildDef, ParseError] = { - val file = new File(workdir, ".crashbox.txt") - if (!file.exists()) { - return Right( - ParseError("No build configuration file .crashbox.txt found.")) - } - - val lines = Files.readAllLines(file.toPath).asScala.map(_.trim) - - val Pattern = """(\w+)\s*:\s*(.+)""".r - - val image = lines - .collectFirst { case Pattern("image", s) => s } - .getOrElse(defaultImage) - val script = lines.collectFirst { case Pattern("script", s) => s } - - script match { - case Some(s) => Left(BuildDef(image, s)) - case None => - Right(ParseError("No build script defined in configuration.")) - } - } -} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala new file mode 100644 index 0000000..f263a9c --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala @@ -0,0 +1,172 @@ +package io.crashbox.ci + +import java.io.{File, OutputStream} +import java.net.URL +import java.nio.file.Files + +import scala.collection.mutable.HashMap +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} +import akka.util.Timeout + +class Scheduler( + val executor: DockerExecutor, + storage: Storage +)(implicit core: Core) { + import Scheduler._ + import core._ + import executor._ + import storage._ + + private def newTempDir: File = + Files.createTempDirectory("crashbox-run").toFile() + + class BuildManager( + buildId: BuildId, + url: URL + ) extends Actor + with ActorLogging { + + var buildDir: Option[File] = None + var out: Option[OutputStream] = None + var containerId: Option[ExecutionId] = None + + override def postStop() = { + containerId foreach { cancelExecution(_) } + out foreach { _.close() } + buildDir foreach { _.delete() } + log.info(s"Stopped build of $url") + } + + override def preStart() = { + log.info(s"Started build of $url") + self ! Cloning(url) + } + + override def receive: Receive = { + + case state @ Cloning(url) => + log.debug("Update build state: cloning") + updateBuildState(buildId, state) + Git.fetchSource(url, newTempDir) onComplete { + case Success(dir) => + self ! Parsing(dir) + case Failure(err) => + self ! Failed(s"Error fetching source from $url") + } + + case state @ Parsing(src) => + log.debug("Update build state: parsing") + updateBuildState(buildId, state) + buildDir = Some(src) + Parser.parseBuild(src) match { + case Left(buildDef) => + self ! Starting(src, buildDef) + case Right(err) => + self ! Failed(s"Failed to parse build $err") + } + + case state @ Starting(src, bd) => + log.debug("Update build state: starting") + updateBuildState(buildId, state) + val so = saveLog(buildId, 0) + out = Some(so) + startExecution(bd.image, bd.script, src, so) onComplete { + case Success(id) => + self ! Running(id) + case Failure(err) => + self ! Failed(s"Failed to start build $err") + } + + case state @ Running(id) => + log.debug("Update build state: running") + updateBuildState(buildId, state) + containerId = Some(id) + waitExecution(id) onComplete { + case Success(status) => + self ! Finished(status) + case Failure(err) => + self ! Failed(s"Error waiting for build to complete") + } + + case state @ Finished(status) => + log.debug("Update build state: finished") + updateBuildState(buildId, state) + context stop self + + case state @ Failed(message) => + log.debug("Update build state: failed") + updateBuildState(buildId, state) + context stop self + } + } + object BuildManager { + def apply(buildId: BuildId, url: URL) = + Props(new BuildManager(buildId, url)) + } + + private sealed trait SchedulerCommand + private case class ScheduleBuild(url: URL) extends SchedulerCommand + private case class CancelBuild(buildId: BuildId) extends SchedulerCommand + + class Scheduler extends Actor { + val runningBuilds = new HashMap[BuildId, ActorRef] + + override def receive = { + + case ScheduleBuild(url) => + val client = sender + //todo handle failure + nextBuild(url.toString).foreach{ build => + val buildManager = + context.actorOf(BuildManager(build.id, url), s"build-${build.id}") + context watch buildManager + runningBuilds += build.id -> buildManager + client ! build.id + } + + case CancelBuild(id) => + runningBuilds.get(id).foreach { builder => + context.stop(builder) + } + + case Terminated(buildManager) => + //TODO use a more efficient data structure + runningBuilds.find(_._2 == buildManager).foreach { + runningBuilds -= _._1 + } + } + } + + private val scheduler = + system.actorOf(Props(new Scheduler()), "crashbox-scheduler") + + // None if build can not be scheduled (queue is full) + def scheduleBuild(url: URL): Future[BuildId] = { + import akka.pattern.ask + implicit val timeout: Timeout = Timeout(5.seconds) + (scheduler ? ScheduleBuild(url)).mapTo[BuildId] + } + + def cancelBuild(buildId: BuildId): Unit = { + scheduler ! CancelBuild(buildId) + } + +} + +object Scheduler { + + sealed trait BuildState + case class Cloning(url: URL) extends BuildState + case class Parsing(dir: File) extends BuildState + case class Starting(dir: File, buildDef: BuildDef) extends BuildState + case class Running(id: ExecutionId) extends BuildState + + sealed trait EndBuildState extends BuildState + case class Finished(status: Int) extends EndBuildState + case class Failed(message: String) extends EndBuildState + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala deleted file mode 100644 index 0beec31..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala +++ /dev/null @@ -1,162 +0,0 @@ -package io.crashbox.ci - -import java.io.{File, OutputStream} -import java.net.URL -import java.nio.file.Files - -import scala.collection.mutable.HashMap -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} -import akka.util.Timeout - -trait Schedulers { - self: Core with Source with Executors with Parsers with Storage => - - private def newTempDir: File = - Files.createTempDirectory("crashbox-run").toFile() - - sealed trait BuildState - case class Cloning(url: URL) extends BuildState - case class Parsing(dir: File) extends BuildState - case class Starting(dir: File, buildDef: BuildDef) extends BuildState - case class Running(id: ExecutionId) extends BuildState - - sealed trait EndBuildState extends BuildState - case class Finished(status: Int) extends EndBuildState - case class Failed(message: String) extends EndBuildState - - class BuildManager( - buildId: BuildId, - url: URL - ) extends Actor - with ActorLogging { - - var buildDir: Option[File] = None - var out: Option[OutputStream] = None - var containerId: Option[ExecutionId] = None - - override def postStop() = { - containerId foreach { cancelExecution(_) } - out foreach { _.close() } - buildDir foreach { _.delete() } - log.info(s"Stopped build of $url") - } - - override def preStart() = { - log.info(s"Started build of $url") - self ! Cloning(url) - } - - override def receive: Receive = { - - case state @ Cloning(url) => - log.debug("Update build state: cloning") - updateBuildState(buildId, state) - fetchSource(url, newTempDir) onComplete { - case Success(dir) => - self ! Parsing(dir) - case Failure(err) => - self ! Failed(s"Error fetching source from $url") - } - - case state @ Parsing(src) => - log.debug("Update build state: parsing") - updateBuildState(buildId, state) - buildDir = Some(src) - parseBuild(src) match { - case Left(buildDef) => - self ! Starting(src, buildDef) - case Right(err) => - self ! Failed(s"Failed to parse build $err") - } - - case state @ Starting(src, bd) => - log.debug("Update build state: starting") - updateBuildState(buildId, state) - val so = saveLog(buildId, 0) - out = Some(so) - startExecution(bd.image, bd.script, src, so) onComplete { - case Success(id) => - self ! Running(id) - case Failure(err) => - self ! Failed(s"Failed to start build $err") - } - - case state @ Running(id) => - log.debug("Update build state: running") - updateBuildState(buildId, state) - containerId = Some(id) - waitExecution(id) onComplete { - case Success(status) => - self ! Finished(status) - case Failure(err) => - self ! Failed(s"Error waiting for build to complete") - } - - case state @ Finished(status) => - log.debug("Update build state: finished") - updateBuildState(buildId, state) - context stop self - - case state @ Failed(message) => - log.debug("Update build state: failed") - updateBuildState(buildId, state) - context stop self - } - } - object BuildManager { - def apply(buildId: BuildId, url: URL) = - Props(new BuildManager(buildId, url)) - } - - private sealed trait SchedulerCommand - private case class ScheduleBuild(url: URL) extends SchedulerCommand - private case class CancelBuild(buildId: BuildId) extends SchedulerCommand - - class Scheduler extends Actor { - val runningBuilds = new HashMap[BuildId, ActorRef] - - override def receive = { - - case ScheduleBuild(url) => - val client = sender - //todo handle failure - nextBuild(url.toString).foreach{ build => - val buildManager = - context.actorOf(BuildManager(build.id, url), s"build-${build.id}") - context watch buildManager - runningBuilds += build.id -> buildManager - client ! build.id - } - - case CancelBuild(id) => - runningBuilds.get(id).foreach { builder => - context.stop(builder) - } - - case Terminated(buildManager) => - //TODO use a more efficient data structure - runningBuilds.find(_._2 == buildManager).foreach { - runningBuilds -= _._1 - } - } - } - - private val scheduler = - system.actorOf(Props(new Scheduler()), "crashbox-scheduler") - - // None if build can not be scheduled (queue is full) - def scheduleBuild(url: URL): Future[BuildId] = { - import akka.pattern.ask - implicit val timeout: Timeout = Timeout(5.seconds) - (scheduler ? ScheduleBuild(url)).mapTo[BuildId] - } - - def cancelBuild(buildId: BuildId): Unit = { - scheduler ! CancelBuild(buildId) - } - -} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala deleted file mode 100644 index dffb5ea..0000000 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala +++ /dev/null @@ -1,19 +0,0 @@ -package io.crashbox.ci - -import java.io.File -import java.net.URL - -import scala.concurrent.Future - -import org.eclipse.jgit.api.Git - -trait Source { self: Core => - - def fetchSource(from: URL, to: File): Future[File] = - Future { - log.debug(s"Cloning git repo from $from to $to") - Git.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call() - to - }(blockingDispatcher) - -} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala index 0f095eb..bb995ef 100644 --- a/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala @@ -12,9 +12,9 @@ import scala.concurrent.Future import java.util.UUID import slick.jdbc.H2Profile.api._ -trait Storage { self: Core with Parsers with Schedulers => - - type BuildId = UUID +class Storage(implicit core: Core) { + import core._ + import Scheduler._ case class Build( id: BuildId, diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/package.scala b/crashbox-server/src/main/scala/io/crashbox/ci/package.scala new file mode 100644 index 0000000..183f9b5 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/package.scala @@ -0,0 +1,9 @@ +package io.crashbox + +import java.util.UUID + +package object ci { + + type BuildId = UUID + +} -- cgit v1.2.3