summaryrefslogtreecommitdiff
path: root/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
diff options
context:
space:
mode:
Diffstat (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala')
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Builder.scala220
1 files changed, 220 insertions, 0 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
new file mode 100644
index 0000000..f77ce0a
--- /dev/null
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -0,0 +1,220 @@
+package io.crashbox.ci
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
+import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
+import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
+import akka.stream.stage.{ GraphStage }
+import io.crashbox.ci.DockerExecutor.ExecutionId
+import java.io.{ File, OutputStream }
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{ HashMap, Queue }
+import scala.concurrent.Future
+import scala.util.{ Failure, Success }
+
+
+case class BuildId(id: String) extends AnyVal
+
+case class TaskId(buildId: String, taskIdx: Int) {
+ override def toString = s"$buildId#$taskIdx"
+}
+
+class BuildStage(
+ executor: DockerExecutor,
+ mkdir: TaskId => File,
+ mkout: TaskId => OutputStream
+) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] {
+ import Builder._
+
+ val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions")
+ val cancellations = Inlet[TaskId]("Builder.cancellations")
+ val states = Outlet[BuildState]("Builder.states")
+
+ val shape = new FanInShape2(submissions, cancellations, states)
+
+ def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
+ implicit def ec = materializer.executionContext
+ //import scala.concurrent.ExecutionContext.Implicits.global
+
+ var runningTaskId: Option[TaskId] = None
+ var runningExecutionId: Future[ExecutionId] =
+ Future.failed(new RuntimeException("not started"))
+
+ override def preStart(): Unit = {
+ log.info("Starting build stage")
+ pull(cancellations)
+ pull(submissions)
+ }
+
+ setHandler(cancellations, new InHandler {
+
+ override def onPush(): Unit = {
+ val tid = grab(cancellations)
+ if (runningTaskId == tid) {
+ runningExecutionId.foreach { eid =>
+ executor.stop(eid)
+ }
+ }
+ pull(cancellations)
+ }
+
+ // don't finish stage when cancellations closes
+ override def onUpstreamFinish() = {}
+
+ })
+
+ setHandler(submissions, new InHandler {
+
+ val changeState = getAsyncCallback[BuildState]{state =>
+ log.info(s"${state.taskId} transitioned to $state")
+ emit(states, state)
+ }
+ val pullNext = getAsyncCallback[Unit]{ _ =>
+ if (isAvailable(states)) {
+ pull(submissions)
+ }
+ }
+
+ override def onPush(): Unit = {
+ val (tid, tdef) = grab(submissions)
+ log.info(s"$tid new submission $tdef")
+
+ val image = tdef.environment match {
+ case DockerEnvironment(image) => image
+ case env => sys.error("Unsupported environemnt")
+ }
+
+ changeState.invoke(TaskStarting(tid, tdef))
+ runningTaskId = Some(tid)
+ runningExecutionId = executor.start(
+ image,
+ tdef.script,
+ mkdir(tid),
+ mkout(tid)
+ ).map{ eid =>
+ changeState.invoke(TaskRunning(tid, eid))
+ eid
+ }
+
+ runningExecutionId.flatMap { eid =>
+ executor.result(eid)
+ } onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(tid, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(tid, err.toString))
+ }
+ pullNext.invoke(())
+ }
+ }
+
+ override def onUpstreamFinish() = {
+ log.info(s"${submissions} finished, completing stage after final build")
+ val callback = getAsyncCallback[Unit]{_ =>
+ completeStage()
+ }
+ runningExecutionId.onComplete { _ => callback.invoke(()) }
+ }
+
+ })
+
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+
+ override def postStop() = {
+ log.info("postStop")
+ runningExecutionId foreach { eid =>
+ log.error("kill")
+ executor.stop(eid)
+ }
+ }
+
+ }
+}
+
+// class Builder(
+// listener: ActorRef,
+// exec: DockerExecutor,
+// mkdir: (BuildId, Int) => File,
+// mkout: (BuildId, Int) => OutputStream
+// ) extends Actor with ActorLogging {
+// import context._
+// import Builder._
+
+// val queue = Queue.empty[(BuildId, Int, BuildDef)]
+// val running = TrieMap.empty[(BuildId, Int), ExecutionId]
+
+// val Parallelism = 2
+// var slots = Parallelism
+
+// def receive = {
+
+// case Cancel(bid) =>
+// running.find(_._1._1 == bid).foreach { case (_, eid) =>
+// // stopping will cause the pipeline to fail an thus remove itself from
+// // the running map
+// exec.stop(eid)
+// }
+
+// case Done =>
+// slots += 1
+// listener ! Next
+
+// case Next if slots > 0 =>
+// slots -= 1
+// val (bid, tid, bdf) = queue.dequeue()
+// val tdf = bdf.tasks(tid)
+// val image = tdf.environment match {
+// case DockerEnvironment(image) => image
+// case env => {log.error(s"can't run $env"); ???}
+// }
+
+// listener ! TaskStarting(bid, tid, tdf)
+
+// val pipeline = for (
+// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid));
+// _ = listener ! TaskRunning(bid, tid, eid);
+// _ = running += (bid, tid) -> eid;
+// status <- exec.result(eid)
+// ) yield {
+// status
+// }
+// pipeline onComplete { res =>
+// running -= ((bid, tid))
+// self ! Done
+// res match {
+// case Success(status) => listener ! TaskFinished(bid, tid, status)
+// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString)
+// }
+// }
+
+// case Next => // no slots available, do nothing
+
+// }
+
+// override def postStop() = {
+// for ((_, eid) <- running) {
+// exec.stop(eid)
+// }
+// }
+
+// }
+
+object Builder {
+
+ //case class Submit(id: BuildId, build: BuildDef)
+ //case class Cancel(id: BuildId)
+
+ private case object Next
+ private case object Done
+
+ sealed trait BuildState {
+ def taskId: TaskId
+ }
+ case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState
+ case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState
+
+ case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
+ case class TaskFailed(taskId: TaskId, message: String) extends BuildState
+
+}