diff options
Diffstat (limited to 'crashboxd/src/main/scala/io')
-rw-r--r-- | crashboxd/src/main/scala/io/crashbox/ci/Builder.scala | 205 | ||||
-rw-r--r-- | crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala | 36 |
2 files changed, 53 insertions, 188 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala index f77ce0a..1e0c771 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } +import akka.stream.SourceShape import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } import akka.stream.stage.{ GraphStage } @@ -19,195 +20,69 @@ case class TaskId(buildId: String, taskIdx: Int) { override def toString = s"$buildId#$taskIdx" } -class BuildStage( +class BuildSource( + taskId: TaskId, + taskDef: TaskDef, executor: DockerExecutor, - mkdir: TaskId => File, - mkout: TaskId => OutputStream -) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { + mkdir: => File, + mkout: => OutputStream // TODO refactor this into a two-output stage +) extends GraphStage[SourceShape[Builder.BuildState]] { import Builder._ - val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") - val cancellations = Inlet[TaskId]("Builder.cancellations") val states = Outlet[BuildState]("Builder.states") - val shape = new FanInShape2(submissions, cancellations, states) + val shape = new SourceShape(states) def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { implicit def ec = materializer.executionContext - //import scala.concurrent.ExecutionContext.Implicits.global - var runningTaskId: Option[TaskId] = None - var runningExecutionId: Future[ExecutionId] = - Future.failed(new RuntimeException("not started")) + lazy val instance: Future[ExecutionId] = executor.start( + taskDef.environment.asInstanceOf[DockerEnvironment].image, + taskDef.script, + mkdir, + mkout + ) - override def preStart(): Unit = { - log.info("Starting build stage") - pull(cancellations) - pull(submissions) + val changeState = getAsyncCallback[BuildState]{state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) } - setHandler(cancellations, new InHandler { - - override def onPush(): Unit = { - val tid = grab(cancellations) - if (runningTaskId == tid) { - runningExecutionId.foreach { eid => - executor.stop(eid) - } - } - pull(cancellations) - } - - // don't finish stage when cancellations closes - override def onUpstreamFinish() = {} - - }) - - setHandler(submissions, new InHandler { - - val changeState = getAsyncCallback[BuildState]{state => - log.info(s"${state.taskId} transitioned to $state") - emit(states, state) - } - val pullNext = getAsyncCallback[Unit]{ _ => - if (isAvailable(states)) { - pull(submissions) - } - } - - override def onPush(): Unit = { - val (tid, tdef) = grab(submissions) - log.info(s"$tid new submission $tdef") - - val image = tdef.environment match { - case DockerEnvironment(image) => image - case env => sys.error("Unsupported environemnt") - } - - changeState.invoke(TaskStarting(tid, tdef)) - runningTaskId = Some(tid) - runningExecutionId = executor.start( - image, - tdef.script, - mkdir(tid), - mkout(tid) - ).map{ eid => - changeState.invoke(TaskRunning(tid, eid)) - eid - } - - runningExecutionId.flatMap { eid => - executor.result(eid) - } onComplete { result => - result match { - case Success(status) => - changeState.invoke(TaskFinished(tid, status)) - case Failure(err) => - changeState.invoke(TaskFailed(tid, err.toString)) - } - pullNext.invoke(()) - } - } + val asyncClose = getAsyncCallback[Unit]{_ => + completeStage() + } - override def onUpstreamFinish() = { - log.info(s"${submissions} finished, completing stage after final build") - val callback = getAsyncCallback[Unit]{_ => - completeStage() + override def preStart() = { + val pipeline = for ( + _ <- Future.unit; + _ = changeState.invoke(TaskStarting(taskId, taskDef)); + eid <- instance; + _ = changeState.invoke(TaskRunning(taskId, eid)); + status <- executor.result(eid) + ) yield status + + pipeline onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(taskId, status)) + case Failure(err) => + changeState.invoke(TaskFailed(taskId, err.toString)) } - runningExecutionId.onComplete { _ => callback.invoke(()) } + asyncClose.invoke(()) } - - }) - - setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } override def postStop() = { - log.info("postStop") - runningExecutionId foreach { eid => - log.error("kill") - executor.stop(eid) - } + instance.foreach{ eid => executor.stop(eid)} } + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } } -// class Builder( -// listener: ActorRef, -// exec: DockerExecutor, -// mkdir: (BuildId, Int) => File, -// mkout: (BuildId, Int) => OutputStream -// ) extends Actor with ActorLogging { -// import context._ -// import Builder._ - -// val queue = Queue.empty[(BuildId, Int, BuildDef)] -// val running = TrieMap.empty[(BuildId, Int), ExecutionId] - -// val Parallelism = 2 -// var slots = Parallelism - -// def receive = { - -// case Cancel(bid) => -// running.find(_._1._1 == bid).foreach { case (_, eid) => -// // stopping will cause the pipeline to fail an thus remove itself from -// // the running map -// exec.stop(eid) -// } - -// case Done => -// slots += 1 -// listener ! Next - -// case Next if slots > 0 => -// slots -= 1 -// val (bid, tid, bdf) = queue.dequeue() -// val tdf = bdf.tasks(tid) -// val image = tdf.environment match { -// case DockerEnvironment(image) => image -// case env => {log.error(s"can't run $env"); ???} -// } - -// listener ! TaskStarting(bid, tid, tdf) - -// val pipeline = for ( -// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); -// _ = listener ! TaskRunning(bid, tid, eid); -// _ = running += (bid, tid) -> eid; -// status <- exec.result(eid) -// ) yield { -// status -// } -// pipeline onComplete { res => -// running -= ((bid, tid)) -// self ! Done -// res match { -// case Success(status) => listener ! TaskFinished(bid, tid, status) -// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) -// } -// } - -// case Next => // no slots available, do nothing - -// } - -// override def postStop() = { -// for ((_, eid) <- running) { -// exec.stop(eid) -// } -// } - -// } - object Builder { - //case class Submit(id: BuildId, build: BuildDef) - //case class Cancel(id: BuildId) - - private case object Next - private case object Done - sealed trait BuildState { def taskId: TaskId } diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala index 5abf7ae..1646777 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import java.io.{File, OutputStream} +import java.util.UUID import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -12,6 +13,7 @@ import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam } +import scala.util.Random import com.spotify.docker.client.LogStream import com.spotify.docker.client.exceptions.ContainerNotFoundException import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} @@ -40,22 +42,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( c } - /* - def makeImage() = { - val returnedImageId = dockerClient.build( - Paths.get("docker directory"), "test", new ProgressHandler() { - override def progress(message: ProgressMessage) = { - val imageId = message.buildImageId() - message.buildImageId() - if (imageId != null) { - //imageIdFromMessage.set(imageId); - } - } - }) - //dockerClient.build - }*/ - - def label = "build" + private val label = "crashbox-executor" -> UUID.randomUUID().toString def start( image: String, @@ -72,7 +59,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( val hostConfig = HostConfig.builder().binds(volume).build() val containerConfig = ContainerConfig .builder() - .labels(Map("crashbox" -> label).asJava) + .labels(Map(label).asJava) .hostConfig(hostConfig) .tty(true) // combine stdout and stderr into stdout .image(image) @@ -126,16 +113,19 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( } } - def reapDeadBuilds(): Unit = { + def clean(): Boolean = { val stale = dockerClient .listContainers( - ListContainersParam.withLabel("crashbox"), - ListContainersParam.withStatusExited() + ListContainersParam.withLabel(label._1, label._2) ) .asScala - stale.foreach { container => - log.warning(s"Removing stale container ${container.id}") - dockerClient.removeContainer(container.id) + stale.isEmpty || { + stale.foreach { container => + log.warning(s"Removing stale container ${container.id}") + dockerClient.killContainer(container.id) + dockerClient.removeContainer(container.id) + } + false } } |