diff options
Diffstat (limited to 'crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala')
-rw-r--r-- | crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala new file mode 100644 index 0000000..f263a9c --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Scheduler.scala @@ -0,0 +1,172 @@ +package io.crashbox.ci + +import java.io.{File, OutputStream} +import java.net.URL +import java.nio.file.Files + +import scala.collection.mutable.HashMap +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated} +import akka.util.Timeout + +class Scheduler( + val executor: DockerExecutor, + storage: Storage +)(implicit core: Core) { + import Scheduler._ + import core._ + import executor._ + import storage._ + + private def newTempDir: File = + Files.createTempDirectory("crashbox-run").toFile() + + class BuildManager( + buildId: BuildId, + url: URL + ) extends Actor + with ActorLogging { + + var buildDir: Option[File] = None + var out: Option[OutputStream] = None + var containerId: Option[ExecutionId] = None + + override def postStop() = { + containerId foreach { cancelExecution(_) } + out foreach { _.close() } + buildDir foreach { _.delete() } + log.info(s"Stopped build of $url") + } + + override def preStart() = { + log.info(s"Started build of $url") + self ! Cloning(url) + } + + override def receive: Receive = { + + case state @ Cloning(url) => + log.debug("Update build state: cloning") + updateBuildState(buildId, state) + Git.fetchSource(url, newTempDir) onComplete { + case Success(dir) => + self ! Parsing(dir) + case Failure(err) => + self ! Failed(s"Error fetching source from $url") + } + + case state @ Parsing(src) => + log.debug("Update build state: parsing") + updateBuildState(buildId, state) + buildDir = Some(src) + Parser.parseBuild(src) match { + case Left(buildDef) => + self ! Starting(src, buildDef) + case Right(err) => + self ! Failed(s"Failed to parse build $err") + } + + case state @ Starting(src, bd) => + log.debug("Update build state: starting") + updateBuildState(buildId, state) + val so = saveLog(buildId, 0) + out = Some(so) + startExecution(bd.image, bd.script, src, so) onComplete { + case Success(id) => + self ! Running(id) + case Failure(err) => + self ! Failed(s"Failed to start build $err") + } + + case state @ Running(id) => + log.debug("Update build state: running") + updateBuildState(buildId, state) + containerId = Some(id) + waitExecution(id) onComplete { + case Success(status) => + self ! Finished(status) + case Failure(err) => + self ! Failed(s"Error waiting for build to complete") + } + + case state @ Finished(status) => + log.debug("Update build state: finished") + updateBuildState(buildId, state) + context stop self + + case state @ Failed(message) => + log.debug("Update build state: failed") + updateBuildState(buildId, state) + context stop self + } + } + object BuildManager { + def apply(buildId: BuildId, url: URL) = + Props(new BuildManager(buildId, url)) + } + + private sealed trait SchedulerCommand + private case class ScheduleBuild(url: URL) extends SchedulerCommand + private case class CancelBuild(buildId: BuildId) extends SchedulerCommand + + class Scheduler extends Actor { + val runningBuilds = new HashMap[BuildId, ActorRef] + + override def receive = { + + case ScheduleBuild(url) => + val client = sender + //todo handle failure + nextBuild(url.toString).foreach{ build => + val buildManager = + context.actorOf(BuildManager(build.id, url), s"build-${build.id}") + context watch buildManager + runningBuilds += build.id -> buildManager + client ! build.id + } + + case CancelBuild(id) => + runningBuilds.get(id).foreach { builder => + context.stop(builder) + } + + case Terminated(buildManager) => + //TODO use a more efficient data structure + runningBuilds.find(_._2 == buildManager).foreach { + runningBuilds -= _._1 + } + } + } + + private val scheduler = + system.actorOf(Props(new Scheduler()), "crashbox-scheduler") + + // None if build can not be scheduled (queue is full) + def scheduleBuild(url: URL): Future[BuildId] = { + import akka.pattern.ask + implicit val timeout: Timeout = Timeout(5.seconds) + (scheduler ? ScheduleBuild(url)).mapTo[BuildId] + } + + def cancelBuild(buildId: BuildId): Unit = { + scheduler ! CancelBuild(buildId) + } + +} + +object Scheduler { + + sealed trait BuildState + case class Cloning(url: URL) extends BuildState + case class Parsing(dir: File) extends BuildState + case class Starting(dir: File, buildDef: BuildDef) extends BuildState + case class Running(id: ExecutionId) extends BuildState + + sealed trait EndBuildState extends BuildState + case class Finished(status: Int) extends EndBuildState + case class Failed(message: String) extends EndBuildState + +} |