diff options
Diffstat (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala')
-rw-r--r-- | crashboxd/src/main/scala/io/crashbox/ci/Builder.scala | 205 |
1 files changed, 40 insertions, 165 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala index f77ce0a..1e0c771 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } +import akka.stream.SourceShape import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } import akka.stream.stage.{ GraphStage } @@ -19,195 +20,69 @@ case class TaskId(buildId: String, taskIdx: Int) { override def toString = s"$buildId#$taskIdx" } -class BuildStage( +class BuildSource( + taskId: TaskId, + taskDef: TaskDef, executor: DockerExecutor, - mkdir: TaskId => File, - mkout: TaskId => OutputStream -) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { + mkdir: => File, + mkout: => OutputStream // TODO refactor this into a two-output stage +) extends GraphStage[SourceShape[Builder.BuildState]] { import Builder._ - val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") - val cancellations = Inlet[TaskId]("Builder.cancellations") val states = Outlet[BuildState]("Builder.states") - val shape = new FanInShape2(submissions, cancellations, states) + val shape = new SourceShape(states) def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { implicit def ec = materializer.executionContext - //import scala.concurrent.ExecutionContext.Implicits.global - var runningTaskId: Option[TaskId] = None - var runningExecutionId: Future[ExecutionId] = - Future.failed(new RuntimeException("not started")) + lazy val instance: Future[ExecutionId] = executor.start( + taskDef.environment.asInstanceOf[DockerEnvironment].image, + taskDef.script, + mkdir, + mkout + ) - override def preStart(): Unit = { - log.info("Starting build stage") - pull(cancellations) - pull(submissions) + val changeState = getAsyncCallback[BuildState]{state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) } - setHandler(cancellations, new InHandler { - - override def onPush(): Unit = { - val tid = grab(cancellations) - if (runningTaskId == tid) { - runningExecutionId.foreach { eid => - executor.stop(eid) - } - } - pull(cancellations) - } - - // don't finish stage when cancellations closes - override def onUpstreamFinish() = {} - - }) - - setHandler(submissions, new InHandler { - - val changeState = getAsyncCallback[BuildState]{state => - log.info(s"${state.taskId} transitioned to $state") - emit(states, state) - } - val pullNext = getAsyncCallback[Unit]{ _ => - if (isAvailable(states)) { - pull(submissions) - } - } - - override def onPush(): Unit = { - val (tid, tdef) = grab(submissions) - log.info(s"$tid new submission $tdef") - - val image = tdef.environment match { - case DockerEnvironment(image) => image - case env => sys.error("Unsupported environemnt") - } - - changeState.invoke(TaskStarting(tid, tdef)) - runningTaskId = Some(tid) - runningExecutionId = executor.start( - image, - tdef.script, - mkdir(tid), - mkout(tid) - ).map{ eid => - changeState.invoke(TaskRunning(tid, eid)) - eid - } - - runningExecutionId.flatMap { eid => - executor.result(eid) - } onComplete { result => - result match { - case Success(status) => - changeState.invoke(TaskFinished(tid, status)) - case Failure(err) => - changeState.invoke(TaskFailed(tid, err.toString)) - } - pullNext.invoke(()) - } - } + val asyncClose = getAsyncCallback[Unit]{_ => + completeStage() + } - override def onUpstreamFinish() = { - log.info(s"${submissions} finished, completing stage after final build") - val callback = getAsyncCallback[Unit]{_ => - completeStage() + override def preStart() = { + val pipeline = for ( + _ <- Future.unit; + _ = changeState.invoke(TaskStarting(taskId, taskDef)); + eid <- instance; + _ = changeState.invoke(TaskRunning(taskId, eid)); + status <- executor.result(eid) + ) yield status + + pipeline onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(taskId, status)) + case Failure(err) => + changeState.invoke(TaskFailed(taskId, err.toString)) } - runningExecutionId.onComplete { _ => callback.invoke(()) } + asyncClose.invoke(()) } - - }) - - setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } override def postStop() = { - log.info("postStop") - runningExecutionId foreach { eid => - log.error("kill") - executor.stop(eid) - } + instance.foreach{ eid => executor.stop(eid)} } + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } } -// class Builder( -// listener: ActorRef, -// exec: DockerExecutor, -// mkdir: (BuildId, Int) => File, -// mkout: (BuildId, Int) => OutputStream -// ) extends Actor with ActorLogging { -// import context._ -// import Builder._ - -// val queue = Queue.empty[(BuildId, Int, BuildDef)] -// val running = TrieMap.empty[(BuildId, Int), ExecutionId] - -// val Parallelism = 2 -// var slots = Parallelism - -// def receive = { - -// case Cancel(bid) => -// running.find(_._1._1 == bid).foreach { case (_, eid) => -// // stopping will cause the pipeline to fail an thus remove itself from -// // the running map -// exec.stop(eid) -// } - -// case Done => -// slots += 1 -// listener ! Next - -// case Next if slots > 0 => -// slots -= 1 -// val (bid, tid, bdf) = queue.dequeue() -// val tdf = bdf.tasks(tid) -// val image = tdf.environment match { -// case DockerEnvironment(image) => image -// case env => {log.error(s"can't run $env"); ???} -// } - -// listener ! TaskStarting(bid, tid, tdf) - -// val pipeline = for ( -// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); -// _ = listener ! TaskRunning(bid, tid, eid); -// _ = running += (bid, tid) -> eid; -// status <- exec.result(eid) -// ) yield { -// status -// } -// pipeline onComplete { res => -// running -= ((bid, tid)) -// self ! Done -// res match { -// case Success(status) => listener ! TaskFinished(bid, tid, status) -// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) -// } -// } - -// case Next => // no slots available, do nothing - -// } - -// override def postStop() = { -// for ((_, eid) <- running) { -// exec.stop(eid) -// } -// } - -// } - object Builder { - //case class Submit(id: BuildId, build: BuildDef) - //case class Cancel(id: BuildId) - - private case object Next - private case object Done - sealed trait BuildState { def taskId: TaskId } |