package io.crashbox.ci import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } import akka.stream.stage.{ GraphStage } import io.crashbox.ci.DockerExecutor.ExecutionId import java.io.{ File, OutputStream } import java.util.concurrent.atomic.AtomicInteger import scala.collection.concurrent.TrieMap import scala.collection.mutable.{ HashMap, Queue } import scala.concurrent.Future import scala.util.{ Failure, Success } case class BuildId(id: String) extends AnyVal case class TaskId(buildId: String, taskIdx: Int) { override def toString = s"$buildId#$taskIdx" } class BuildStage( executor: DockerExecutor, mkdir: TaskId => File, mkout: TaskId => OutputStream ) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { import Builder._ val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") val cancellations = Inlet[TaskId]("Builder.cancellations") val states = Outlet[BuildState]("Builder.states") val shape = new FanInShape2(submissions, cancellations, states) def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { implicit def ec = materializer.executionContext //import scala.concurrent.ExecutionContext.Implicits.global var runningTaskId: Option[TaskId] = None var runningExecutionId: Future[ExecutionId] = Future.failed(new RuntimeException("not started")) override def preStart(): Unit = { log.info("Starting build stage") pull(cancellations) pull(submissions) } setHandler(cancellations, new InHandler { override def onPush(): Unit = { val tid = grab(cancellations) if (runningTaskId == tid) { runningExecutionId.foreach { eid => executor.stop(eid) } } pull(cancellations) } // don't finish stage when cancellations closes override def onUpstreamFinish() = {} }) setHandler(submissions, new InHandler { val changeState = getAsyncCallback[BuildState]{state => log.info(s"${state.taskId} transitioned to $state") emit(states, state) } val pullNext = getAsyncCallback[Unit]{ _ => if (isAvailable(states)) { pull(submissions) } } override def onPush(): Unit = { val (tid, tdef) = grab(submissions) log.info(s"$tid new submission $tdef") val image = tdef.environment match { case DockerEnvironment(image) => image case env => sys.error("Unsupported environemnt") } changeState.invoke(TaskStarting(tid, tdef)) runningTaskId = Some(tid) runningExecutionId = executor.start( image, tdef.script, mkdir(tid), mkout(tid) ).map{ eid => changeState.invoke(TaskRunning(tid, eid)) eid } runningExecutionId.flatMap { eid => executor.result(eid) } onComplete { result => result match { case Success(status) => changeState.invoke(TaskFinished(tid, status)) case Failure(err) => changeState.invoke(TaskFailed(tid, err.toString)) } pullNext.invoke(()) } } override def onUpstreamFinish() = { log.info(s"${submissions} finished, completing stage after final build") val callback = getAsyncCallback[Unit]{_ => completeStage() } runningExecutionId.onComplete { _ => callback.invoke(()) } } }) setHandler(states, GraphStageLogic.IgnoreTerminateOutput) override def postStop() = { log.info("postStop") runningExecutionId foreach { eid => log.error("kill") executor.stop(eid) } } } } // class Builder( // listener: ActorRef, // exec: DockerExecutor, // mkdir: (BuildId, Int) => File, // mkout: (BuildId, Int) => OutputStream // ) extends Actor with ActorLogging { // import context._ // import Builder._ // val queue = Queue.empty[(BuildId, Int, BuildDef)] // val running = TrieMap.empty[(BuildId, Int), ExecutionId] // val Parallelism = 2 // var slots = Parallelism // def receive = { // case Cancel(bid) => // running.find(_._1._1 == bid).foreach { case (_, eid) => // // stopping will cause the pipeline to fail an thus remove itself from // // the running map // exec.stop(eid) // } // case Done => // slots += 1 // listener ! Next // case Next if slots > 0 => // slots -= 1 // val (bid, tid, bdf) = queue.dequeue() // val tdf = bdf.tasks(tid) // val image = tdf.environment match { // case DockerEnvironment(image) => image // case env => {log.error(s"can't run $env"); ???} // } // listener ! TaskStarting(bid, tid, tdf) // val pipeline = for ( // eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); // _ = listener ! TaskRunning(bid, tid, eid); // _ = running += (bid, tid) -> eid; // status <- exec.result(eid) // ) yield { // status // } // pipeline onComplete { res => // running -= ((bid, tid)) // self ! Done // res match { // case Success(status) => listener ! TaskFinished(bid, tid, status) // case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) // } // } // case Next => // no slots available, do nothing // } // override def postStop() = { // for ((_, eid) <- running) { // exec.stop(eid) // } // } // } object Builder { //case class Submit(id: BuildId, build: BuildDef) //case class Cancel(id: BuildId) private case object Next private case object Done sealed trait BuildState { def taskId: TaskId } case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState case class TaskFinished(taskId: TaskId, status: Int) extends BuildState case class TaskFailed(taskId: TaskId, message: String) extends BuildState }