path: root/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
diff options
authorJakob Odersky <>2017-04-09 16:24:48 -0700
committerJakob Odersky <>2017-04-09 16:24:48 -0700
commit382152098459a5783e3a794161ed2da2a321af37 (patch)
tree40d2f57afcfbeb168d9fb00b2e14b1b6ffab3069 /crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
parentbea923442ceb7cfaf855a3edb2e685ee843f3943 (diff)
less leaky
Diffstat (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala')
1 files changed, 40 insertions, 165 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
index f77ce0a..1e0c771 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -1,6 +1,7 @@
import{ Actor, ActorLogging, ActorRef, ActorSystem }
import{ GraphStageLogic, InHandler, OutHandler, StageLogging }
import{ Attributes, FanInShape2, Inlet, Outlet }
import{ GraphStage }
@@ -19,195 +20,69 @@ case class TaskId(buildId: String, taskIdx: Int) {
override def toString = s"$buildId#$taskIdx"
-class BuildStage(
+class BuildSource(
+ taskId: TaskId,
+ taskDef: TaskDef,
executor: DockerExecutor,
- mkdir: TaskId => File,
- mkout: TaskId => OutputStream
-) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] {
+ mkdir: => File,
+ mkout: => OutputStream // TODO refactor this into a two-output stage
+) extends GraphStage[SourceShape[Builder.BuildState]] {
import Builder._
- val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions")
- val cancellations = Inlet[TaskId]("Builder.cancellations")
val states = Outlet[BuildState]("Builder.states")
- val shape = new FanInShape2(submissions, cancellations, states)
+ val shape = new SourceShape(states)
def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
implicit def ec = materializer.executionContext
- //import
- var runningTaskId: Option[TaskId] = None
- var runningExecutionId: Future[ExecutionId] =
- Future.failed(new RuntimeException("not started"))
+ lazy val instance: Future[ExecutionId] = executor.start(
+ taskDef.environment.asInstanceOf[DockerEnvironment].image,
+ taskDef.script,
+ mkdir,
+ mkout
+ )
- override def preStart(): Unit = {
-"Starting build stage")
- pull(cancellations)
- pull(submissions)
+ val changeState = getAsyncCallback[BuildState]{state =>
+"${state.taskId} transitioned to $state")
+ emit(states, state)
- setHandler(cancellations, new InHandler {
- override def onPush(): Unit = {
- val tid = grab(cancellations)
- if (runningTaskId == tid) {
- runningExecutionId.foreach { eid =>
- executor.stop(eid)
- }
- }
- pull(cancellations)
- }
- // don't finish stage when cancellations closes
- override def onUpstreamFinish() = {}
- })
- setHandler(submissions, new InHandler {
- val changeState = getAsyncCallback[BuildState]{state =>
-"${state.taskId} transitioned to $state")
- emit(states, state)
- }
- val pullNext = getAsyncCallback[Unit]{ _ =>
- if (isAvailable(states)) {
- pull(submissions)
- }
- }
- override def onPush(): Unit = {
- val (tid, tdef) = grab(submissions)
-"$tid new submission $tdef")
- val image = tdef.environment match {
- case DockerEnvironment(image) => image
- case env => sys.error("Unsupported environemnt")
- }
- changeState.invoke(TaskStarting(tid, tdef))
- runningTaskId = Some(tid)
- runningExecutionId = executor.start(
- image,
- tdef.script,
- mkdir(tid),
- mkout(tid)
- ).map{ eid =>
- changeState.invoke(TaskRunning(tid, eid))
- eid
- }
- runningExecutionId.flatMap { eid =>
- executor.result(eid)
- } onComplete { result =>
- result match {
- case Success(status) =>
- changeState.invoke(TaskFinished(tid, status))
- case Failure(err) =>
- changeState.invoke(TaskFailed(tid, err.toString))
- }
- pullNext.invoke(())
- }
- }
+ val asyncClose = getAsyncCallback[Unit]{_ =>
+ completeStage()
+ }
- override def onUpstreamFinish() = {
-"${submissions} finished, completing stage after final build")
- val callback = getAsyncCallback[Unit]{_ =>
- completeStage()
+ override def preStart() = {
+ val pipeline = for (
+ _ <- Future.unit;
+ _ = changeState.invoke(TaskStarting(taskId, taskDef));
+ eid <- instance;
+ _ = changeState.invoke(TaskRunning(taskId, eid));
+ status <- executor.result(eid)
+ ) yield status
+ pipeline onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(taskId, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(taskId, err.toString))
- runningExecutionId.onComplete { _ => callback.invoke(()) }
+ asyncClose.invoke(())
- })
- setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+ }
override def postStop() = {
- runningExecutionId foreach { eid =>
- log.error("kill")
- executor.stop(eid)
- }
+ instance.foreach{ eid => executor.stop(eid)}
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
-// class Builder(
-// listener: ActorRef,
-// exec: DockerExecutor,
-// mkdir: (BuildId, Int) => File,
-// mkout: (BuildId, Int) => OutputStream
-// ) extends Actor with ActorLogging {
-// import context._
-// import Builder._
-// val queue = Queue.empty[(BuildId, Int, BuildDef)]
-// val running = TrieMap.empty[(BuildId, Int), ExecutionId]
-// val Parallelism = 2
-// var slots = Parallelism
-// def receive = {
-// case Cancel(bid) =>
-// running.find(_._1._1 == bid).foreach { case (_, eid) =>
-// // stopping will cause the pipeline to fail an thus remove itself from
-// // the running map
-// exec.stop(eid)
-// }
-// case Done =>
-// slots += 1
-// listener ! Next
-// case Next if slots > 0 =>
-// slots -= 1
-// val (bid, tid, bdf) = queue.dequeue()
-// val tdf = bdf.tasks(tid)
-// val image = tdf.environment match {
-// case DockerEnvironment(image) => image
-// case env => {log.error(s"can't run $env"); ???}
-// }
-// listener ! TaskStarting(bid, tid, tdf)
-// val pipeline = for (
-// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid));
-// _ = listener ! TaskRunning(bid, tid, eid);
-// _ = running += (bid, tid) -> eid;
-// status <- exec.result(eid)
-// ) yield {
-// status
-// }
-// pipeline onComplete { res =>
-// running -= ((bid, tid))
-// self ! Done
-// res match {
-// case Success(status) => listener ! TaskFinished(bid, tid, status)
-// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString)
-// }
-// }
-// case Next => // no slots available, do nothing
-// }
-// override def postStop() = {
-// for ((_, eid) <- running) {
-// exec.stop(eid)
-// }
-// }
-// }
object Builder {
- //case class Submit(id: BuildId, build: BuildDef)
- //case class Cancel(id: BuildId)
- private case object Next
- private case object Done
sealed trait BuildState {
def taskId: TaskId