From bea923442ceb7cfaf855a3edb2e685ee843f3943 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 9 Apr 2017 14:18:23 -0700 Subject: leaky builder --- .../src/main/scala/io/crashbox/ci/Builder.scala | 220 +++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 crashboxd/src/main/scala/io/crashbox/ci/Builder.scala (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala') diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala new file mode 100644 index 0000000..f77ce0a --- /dev/null +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -0,0 +1,220 @@ +package io.crashbox.ci + +import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } +import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } +import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } +import akka.stream.stage.{ GraphStage } +import io.crashbox.ci.DockerExecutor.ExecutionId +import java.io.{ File, OutputStream } +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.{ HashMap, Queue } +import scala.concurrent.Future +import scala.util.{ Failure, Success } + + +case class BuildId(id: String) extends AnyVal + +case class TaskId(buildId: String, taskIdx: Int) { + override def toString = s"$buildId#$taskIdx" +} + +class BuildStage( + executor: DockerExecutor, + mkdir: TaskId => File, + mkout: TaskId => OutputStream +) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { + import Builder._ + + val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") + val cancellations = Inlet[TaskId]("Builder.cancellations") + val states = Outlet[BuildState]("Builder.states") + + val shape = new FanInShape2(submissions, cancellations, states) + + def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { + implicit def ec = materializer.executionContext + //import scala.concurrent.ExecutionContext.Implicits.global + + var runningTaskId: Option[TaskId] = None + var runningExecutionId: Future[ExecutionId] = + Future.failed(new RuntimeException("not started")) + + override def preStart(): Unit = { + log.info("Starting build stage") + pull(cancellations) + pull(submissions) + } + + setHandler(cancellations, new InHandler { + + override def onPush(): Unit = { + val tid = grab(cancellations) + if (runningTaskId == tid) { + runningExecutionId.foreach { eid => + executor.stop(eid) + } + } + pull(cancellations) + } + + // don't finish stage when cancellations closes + override def onUpstreamFinish() = {} + + }) + + setHandler(submissions, new InHandler { + + val changeState = getAsyncCallback[BuildState]{state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) + } + val pullNext = getAsyncCallback[Unit]{ _ => + if (isAvailable(states)) { + pull(submissions) + } + } + + override def onPush(): Unit = { + val (tid, tdef) = grab(submissions) + log.info(s"$tid new submission $tdef") + + val image = tdef.environment match { + case DockerEnvironment(image) => image + case env => sys.error("Unsupported environemnt") + } + + changeState.invoke(TaskStarting(tid, tdef)) + runningTaskId = Some(tid) + runningExecutionId = executor.start( + image, + tdef.script, + mkdir(tid), + mkout(tid) + ).map{ eid => + changeState.invoke(TaskRunning(tid, eid)) + eid + } + + runningExecutionId.flatMap { eid => + executor.result(eid) + } onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(tid, status)) + case Failure(err) => + changeState.invoke(TaskFailed(tid, err.toString)) + } + pullNext.invoke(()) + } + } + + override def onUpstreamFinish() = { + log.info(s"${submissions} finished, completing stage after final build") + val callback = getAsyncCallback[Unit]{_ => + completeStage() + } + runningExecutionId.onComplete { _ => callback.invoke(()) } + } + + }) + + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + + override def postStop() = { + log.info("postStop") + runningExecutionId foreach { eid => + log.error("kill") + executor.stop(eid) + } + } + + } +} + +// class Builder( +// listener: ActorRef, +// exec: DockerExecutor, +// mkdir: (BuildId, Int) => File, +// mkout: (BuildId, Int) => OutputStream +// ) extends Actor with ActorLogging { +// import context._ +// import Builder._ + +// val queue = Queue.empty[(BuildId, Int, BuildDef)] +// val running = TrieMap.empty[(BuildId, Int), ExecutionId] + +// val Parallelism = 2 +// var slots = Parallelism + +// def receive = { + +// case Cancel(bid) => +// running.find(_._1._1 == bid).foreach { case (_, eid) => +// // stopping will cause the pipeline to fail an thus remove itself from +// // the running map +// exec.stop(eid) +// } + +// case Done => +// slots += 1 +// listener ! Next + +// case Next if slots > 0 => +// slots -= 1 +// val (bid, tid, bdf) = queue.dequeue() +// val tdf = bdf.tasks(tid) +// val image = tdf.environment match { +// case DockerEnvironment(image) => image +// case env => {log.error(s"can't run $env"); ???} +// } + +// listener ! TaskStarting(bid, tid, tdf) + +// val pipeline = for ( +// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); +// _ = listener ! TaskRunning(bid, tid, eid); +// _ = running += (bid, tid) -> eid; +// status <- exec.result(eid) +// ) yield { +// status +// } +// pipeline onComplete { res => +// running -= ((bid, tid)) +// self ! Done +// res match { +// case Success(status) => listener ! TaskFinished(bid, tid, status) +// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) +// } +// } + +// case Next => // no slots available, do nothing + +// } + +// override def postStop() = { +// for ((_, eid) <- running) { +// exec.stop(eid) +// } +// } + +// } + +object Builder { + + //case class Submit(id: BuildId, build: BuildDef) + //case class Cancel(id: BuildId) + + private case object Next + private case object Done + + sealed trait BuildState { + def taskId: TaskId + } + case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState + case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState + + case class TaskFinished(taskId: TaskId, status: Int) extends BuildState + case class TaskFailed(taskId: TaskId, message: String) extends BuildState + +} -- cgit v1.2.3