From bea923442ceb7cfaf855a3edb2e685ee843f3943 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 9 Apr 2017 14:18:23 -0700 Subject: leaky builder --- .../src/main/scala/io/crashbox/ci/Builder.scala | 220 +++++++++++++++++++++ .../main/scala/io/crashbox/ci/DockerExecutor.scala | 6 +- .../test/scala/io/crashbox/ci/BuildStageSpec.scala | 63 ++++++ .../scala/io/crashbox/ci/DockerExecutorSpec.scala | 78 +++++++- 4 files changed, 358 insertions(+), 9 deletions(-) create mode 100644 crashboxd/src/main/scala/io/crashbox/ci/Builder.scala create mode 100644 crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala new file mode 100644 index 0000000..f77ce0a --- /dev/null +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -0,0 +1,220 @@ +package io.crashbox.ci + +import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } +import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } +import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } +import akka.stream.stage.{ GraphStage } +import io.crashbox.ci.DockerExecutor.ExecutionId +import java.io.{ File, OutputStream } +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.{ HashMap, Queue } +import scala.concurrent.Future +import scala.util.{ Failure, Success } + + +case class BuildId(id: String) extends AnyVal + +case class TaskId(buildId: String, taskIdx: Int) { + override def toString = s"$buildId#$taskIdx" +} + +class BuildStage( + executor: DockerExecutor, + mkdir: TaskId => File, + mkout: TaskId => OutputStream +) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { + import Builder._ + + val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") + val cancellations = Inlet[TaskId]("Builder.cancellations") + val states = Outlet[BuildState]("Builder.states") + + val shape = new FanInShape2(submissions, cancellations, states) + + def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { + implicit def ec = materializer.executionContext + //import scala.concurrent.ExecutionContext.Implicits.global + + var runningTaskId: Option[TaskId] = None + var runningExecutionId: Future[ExecutionId] = + Future.failed(new RuntimeException("not started")) + + override def preStart(): Unit = { + log.info("Starting build stage") + pull(cancellations) + pull(submissions) + } + + setHandler(cancellations, new InHandler { + + override def onPush(): Unit = { + val tid = grab(cancellations) + if (runningTaskId == tid) { + runningExecutionId.foreach { eid => + executor.stop(eid) + } + } + pull(cancellations) + } + + // don't finish stage when cancellations closes + override def onUpstreamFinish() = {} + + }) + + setHandler(submissions, new InHandler { + + val changeState = getAsyncCallback[BuildState]{state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) + } + val pullNext = getAsyncCallback[Unit]{ _ => + if (isAvailable(states)) { + pull(submissions) + } + } + + override def onPush(): Unit = { + val (tid, tdef) = grab(submissions) + log.info(s"$tid new submission $tdef") + + val image = tdef.environment match { + case DockerEnvironment(image) => image + case env => sys.error("Unsupported environemnt") + } + + changeState.invoke(TaskStarting(tid, tdef)) + runningTaskId = Some(tid) + runningExecutionId = executor.start( + image, + tdef.script, + mkdir(tid), + mkout(tid) + ).map{ eid => + changeState.invoke(TaskRunning(tid, eid)) + eid + } + + runningExecutionId.flatMap { eid => + executor.result(eid) + } onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(tid, status)) + case Failure(err) => + changeState.invoke(TaskFailed(tid, err.toString)) + } + pullNext.invoke(()) + } + } + + override def onUpstreamFinish() = { + log.info(s"${submissions} finished, completing stage after final build") + val callback = getAsyncCallback[Unit]{_ => + completeStage() + } + runningExecutionId.onComplete { _ => callback.invoke(()) } + } + + }) + + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + + override def postStop() = { + log.info("postStop") + runningExecutionId foreach { eid => + log.error("kill") + executor.stop(eid) + } + } + + } +} + +// class Builder( +// listener: ActorRef, +// exec: DockerExecutor, +// mkdir: (BuildId, Int) => File, +// mkout: (BuildId, Int) => OutputStream +// ) extends Actor with ActorLogging { +// import context._ +// import Builder._ + +// val queue = Queue.empty[(BuildId, Int, BuildDef)] +// val running = TrieMap.empty[(BuildId, Int), ExecutionId] + +// val Parallelism = 2 +// var slots = Parallelism + +// def receive = { + +// case Cancel(bid) => +// running.find(_._1._1 == bid).foreach { case (_, eid) => +// // stopping will cause the pipeline to fail an thus remove itself from +// // the running map +// exec.stop(eid) +// } + +// case Done => +// slots += 1 +// listener ! Next + +// case Next if slots > 0 => +// slots -= 1 +// val (bid, tid, bdf) = queue.dequeue() +// val tdf = bdf.tasks(tid) +// val image = tdf.environment match { +// case DockerEnvironment(image) => image +// case env => {log.error(s"can't run $env"); ???} +// } + +// listener ! TaskStarting(bid, tid, tdf) + +// val pipeline = for ( +// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); +// _ = listener ! TaskRunning(bid, tid, eid); +// _ = running += (bid, tid) -> eid; +// status <- exec.result(eid) +// ) yield { +// status +// } +// pipeline onComplete { res => +// running -= ((bid, tid)) +// self ! Done +// res match { +// case Success(status) => listener ! TaskFinished(bid, tid, status) +// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) +// } +// } + +// case Next => // no slots available, do nothing + +// } + +// override def postStop() = { +// for ((_, eid) <- running) { +// exec.stop(eid) +// } +// } + +// } + +object Builder { + + //case class Submit(id: BuildId, build: BuildDef) + //case class Cancel(id: BuildId) + + private case object Next + private case object Done + + sealed trait BuildState { + def taskId: TaskId + } + case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState + case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState + + case class TaskFinished(taskId: TaskId, status: Int) extends BuildState + case class TaskFailed(taskId: TaskId, message: String) extends BuildState + +} diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala index 3c24b06..5abf7ae 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala @@ -53,7 +53,9 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( } }) //dockerClient.build - }*/ + }*/ + + def label = "build" def start( image: String, @@ -70,7 +72,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( val hostConfig = HostConfig.builder().binds(volume).build() val containerConfig = ContainerConfig .builder() - .labels(Map("crashbox" -> "build").asJava) + .labels(Map("crashbox" -> label).asJava) .hostConfig(hostConfig) .tty(true) // combine stdout and stderr into stdout .image(image) diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala new file mode 100644 index 0000000..a3a68b6 --- /dev/null +++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala @@ -0,0 +1,63 @@ +package io.crashbox.ci + +import akka.actor.ActorSystem +import akka.stream.{ ClosedShape, KillSwitch } +import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source } +import akka.stream.{ ActorMaterializer, FanInShape2 } +import java.io.{ ByteArrayOutputStream, File } +import java.nio.file.Files +import org.scalatest._ +import scala.concurrent.Await +import scala.concurrent.duration._ + +class BuildStageSpec extends FlatSpec with Matchers with DockerSuite{ + + implicit val materializer = ActorMaterializer() + + val exec = new DockerExecutor + + def withTmp[A](action: (File, ByteArrayOutputStream) => A): A = { + val dir = Files.createTempDirectory("crashbox-build-stage-test").toFile + val out = new ByteArrayOutputStream(1024) + try action(dir, out) + finally dir.delete() + } + + "BuildStage" should "have a test!" in { + withTmp{ case (dir, out) => + val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 100; exit 0") + + val resultSink = Sink.foreach[Builder.BuildState](x => println(x)) + + val graph = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { + implicit b => sink => + import GraphDSL.Implicits._ + + val builder = b.add(new BuildStage(exec, _ => dir, _ => out)) + + val submissions = b.add( + Source.repeat(TaskId("123", 2) -> taskDef)) + val cancellations = b.add( + Source.tick(10.seconds, 10.seconds, TaskId("0", 0))) + + val ks = b.add(KillSwitch) + + submissions ~> builder.in0 + cancellations ~> builder.in1 + + builder.out ~> sink + + ClosedShape + }) + + graph.run() + Thread.sleep(30000) + println("terminating") + Await.result(system.terminate(), 60.seconds) + println("teminated") + Thread.sleep(5000) + println("eot") + } + + } +} diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala index 2b6dce7..23cbced 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala @@ -1,5 +1,6 @@ package io.crashbox.ci +import com.spotify.docker.client.DockerClient.ListContainersParam import java.io.{ByteArrayOutputStream, File} import java.nio.file.Files @@ -9,6 +10,69 @@ import scala.concurrent.duration._ import akka.actor.ActorSystem import org.scalatest._ +import scala.util.Random + + +trait DockerSuite extends Suite with BeforeAndAfterAll { self => + + private val name = self.toString() + + private def withTmp[A](action: File => A): A = { + val dir = Files.createTempDirectory("crashbox-docker-test-" + name).toFile + try action(dir) + finally dir.delete() + } + + val baseImage = "debian:jessie-backports" + val dockerImage = "crashbox" + val dockerTimeout = 30.seconds + val dockerLabel = "test-" + Random.nextInt() + + implicit val system = ActorSystem("crashbox-docker-test-" + name) + import system.dispatcher + val executor = new DockerExecutor { + override def label = dockerLabel + } + + def buildImage(): Unit = { + println("Pulling base docker image for running docker tests") + executor.dockerClient.pull(baseImage) + + withTmp { dir => + println("Adapting base image for tests") + val modifications = s"""|FROM $baseImage + |RUN adduser crashbox + |USER crashbox + |""".stripMargin + Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes) + executor.dockerClient.build(dir.toPath, dockerImage) + } + } + + def runningDockers: Seq[String] = { + val stale = executor.dockerClient + .listContainers( + ListContainersParam.withLabel("crashbox", dockerLabel) + ).asScala + stale.map(_.id()) + } + + override def beforeAll: Unit = { + buildImage() + } + + override def afterAll: Unit = { + val running = runningDockers + running.foreach { id => + executor.dockerClient.stopContainer(id, 0) + executor.dockerClient.removeContainer(id) + } + require(running.isEmpty, "Docker containers were left running after unit tests") + require(runningDockers.isEmpty, "Could not delete left over docker containers.") + } + +} + class DockerExecutorSpec extends FlatSpec @@ -52,13 +116,13 @@ class DockerExecutorSpec def run[A](script: String)(tests: (Int, File, String) => A): A = withTmp { dir => - val out = new ByteArrayOutputStream(1024) - val awaitable = for (id <- exec.start(image, script, dir, out); - status <- exec.result(id)) yield { - status - } - val status = Await.result(awaitable, timeout) - tests(status, dir, new String(out.toByteArray()).trim()) + val out = new ByteArrayOutputStream(1024) + val awaitable = for (id <- exec.start(image, script, dir, out); + status <- exec.result(id)) yield { + status + } + val status = Await.result(awaitable, timeout) + tests(status, dir, new String(out.toByteArray()).trim()) } "DockerExecutor" should "return expected exit codes" in { -- cgit v1.2.3