summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-04-09 14:18:23 -0700
committerJakob Odersky <jakob@odersky.com>2017-04-09 14:18:23 -0700
commitbea923442ceb7cfaf855a3edb2e685ee843f3943 (patch)
treef92483d07814854ec1dd6d8cd7d98d69e0d30323
parent26aa8adc30a84d983d020e34b488ac22a31cb544 (diff)
downloadcrashbox-ci-bea923442ceb7cfaf855a3edb2e685ee843f3943.tar.gz
crashbox-ci-bea923442ceb7cfaf855a3edb2e685ee843f3943.tar.bz2
crashbox-ci-bea923442ceb7cfaf855a3edb2e685ee843f3943.zip
leaky builder
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Builder.scala220
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala6
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala63
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala78
4 files changed, 358 insertions, 9 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
new file mode 100644
index 0000000..f77ce0a
--- /dev/null
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -0,0 +1,220 @@
+package io.crashbox.ci
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
+import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
+import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
+import akka.stream.stage.{ GraphStage }
+import io.crashbox.ci.DockerExecutor.ExecutionId
+import java.io.{ File, OutputStream }
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{ HashMap, Queue }
+import scala.concurrent.Future
+import scala.util.{ Failure, Success }
+
+
+case class BuildId(id: String) extends AnyVal
+
+case class TaskId(buildId: String, taskIdx: Int) {
+ override def toString = s"$buildId#$taskIdx"
+}
+
+class BuildStage(
+ executor: DockerExecutor,
+ mkdir: TaskId => File,
+ mkout: TaskId => OutputStream
+) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] {
+ import Builder._
+
+ val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions")
+ val cancellations = Inlet[TaskId]("Builder.cancellations")
+ val states = Outlet[BuildState]("Builder.states")
+
+ val shape = new FanInShape2(submissions, cancellations, states)
+
+ def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
+ implicit def ec = materializer.executionContext
+ //import scala.concurrent.ExecutionContext.Implicits.global
+
+ var runningTaskId: Option[TaskId] = None
+ var runningExecutionId: Future[ExecutionId] =
+ Future.failed(new RuntimeException("not started"))
+
+ override def preStart(): Unit = {
+ log.info("Starting build stage")
+ pull(cancellations)
+ pull(submissions)
+ }
+
+ setHandler(cancellations, new InHandler {
+
+ override def onPush(): Unit = {
+ val tid = grab(cancellations)
+ if (runningTaskId == tid) {
+ runningExecutionId.foreach { eid =>
+ executor.stop(eid)
+ }
+ }
+ pull(cancellations)
+ }
+
+ // don't finish stage when cancellations closes
+ override def onUpstreamFinish() = {}
+
+ })
+
+ setHandler(submissions, new InHandler {
+
+ val changeState = getAsyncCallback[BuildState]{state =>
+ log.info(s"${state.taskId} transitioned to $state")
+ emit(states, state)
+ }
+ val pullNext = getAsyncCallback[Unit]{ _ =>
+ if (isAvailable(states)) {
+ pull(submissions)
+ }
+ }
+
+ override def onPush(): Unit = {
+ val (tid, tdef) = grab(submissions)
+ log.info(s"$tid new submission $tdef")
+
+ val image = tdef.environment match {
+ case DockerEnvironment(image) => image
+ case env => sys.error("Unsupported environemnt")
+ }
+
+ changeState.invoke(TaskStarting(tid, tdef))
+ runningTaskId = Some(tid)
+ runningExecutionId = executor.start(
+ image,
+ tdef.script,
+ mkdir(tid),
+ mkout(tid)
+ ).map{ eid =>
+ changeState.invoke(TaskRunning(tid, eid))
+ eid
+ }
+
+ runningExecutionId.flatMap { eid =>
+ executor.result(eid)
+ } onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(tid, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(tid, err.toString))
+ }
+ pullNext.invoke(())
+ }
+ }
+
+ override def onUpstreamFinish() = {
+ log.info(s"${submissions} finished, completing stage after final build")
+ val callback = getAsyncCallback[Unit]{_ =>
+ completeStage()
+ }
+ runningExecutionId.onComplete { _ => callback.invoke(()) }
+ }
+
+ })
+
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+
+ override def postStop() = {
+ log.info("postStop")
+ runningExecutionId foreach { eid =>
+ log.error("kill")
+ executor.stop(eid)
+ }
+ }
+
+ }
+}
+
+// class Builder(
+// listener: ActorRef,
+// exec: DockerExecutor,
+// mkdir: (BuildId, Int) => File,
+// mkout: (BuildId, Int) => OutputStream
+// ) extends Actor with ActorLogging {
+// import context._
+// import Builder._
+
+// val queue = Queue.empty[(BuildId, Int, BuildDef)]
+// val running = TrieMap.empty[(BuildId, Int), ExecutionId]
+
+// val Parallelism = 2
+// var slots = Parallelism
+
+// def receive = {
+
+// case Cancel(bid) =>
+// running.find(_._1._1 == bid).foreach { case (_, eid) =>
+// // stopping will cause the pipeline to fail an thus remove itself from
+// // the running map
+// exec.stop(eid)
+// }
+
+// case Done =>
+// slots += 1
+// listener ! Next
+
+// case Next if slots > 0 =>
+// slots -= 1
+// val (bid, tid, bdf) = queue.dequeue()
+// val tdf = bdf.tasks(tid)
+// val image = tdf.environment match {
+// case DockerEnvironment(image) => image
+// case env => {log.error(s"can't run $env"); ???}
+// }
+
+// listener ! TaskStarting(bid, tid, tdf)
+
+// val pipeline = for (
+// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid));
+// _ = listener ! TaskRunning(bid, tid, eid);
+// _ = running += (bid, tid) -> eid;
+// status <- exec.result(eid)
+// ) yield {
+// status
+// }
+// pipeline onComplete { res =>
+// running -= ((bid, tid))
+// self ! Done
+// res match {
+// case Success(status) => listener ! TaskFinished(bid, tid, status)
+// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString)
+// }
+// }
+
+// case Next => // no slots available, do nothing
+
+// }
+
+// override def postStop() = {
+// for ((_, eid) <- running) {
+// exec.stop(eid)
+// }
+// }
+
+// }
+
+object Builder {
+
+ //case class Submit(id: BuildId, build: BuildDef)
+ //case class Cancel(id: BuildId)
+
+ private case object Next
+ private case object Done
+
+ sealed trait BuildState {
+ def taskId: TaskId
+ }
+ case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState
+ case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState
+
+ case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
+ case class TaskFailed(taskId: TaskId, message: String) extends BuildState
+
+}
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
index 3c24b06..5abf7ae 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
@@ -53,7 +53,9 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")(
}
})
//dockerClient.build
- }*/
+ }*/
+
+ def label = "build"
def start(
image: String,
@@ -70,7 +72,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")(
val hostConfig = HostConfig.builder().binds(volume).build()
val containerConfig = ContainerConfig
.builder()
- .labels(Map("crashbox" -> "build").asJava)
+ .labels(Map("crashbox" -> label).asJava)
.hostConfig(hostConfig)
.tty(true) // combine stdout and stderr into stdout
.image(image)
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
new file mode 100644
index 0000000..a3a68b6
--- /dev/null
+++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
@@ -0,0 +1,63 @@
+package io.crashbox.ci
+
+import akka.actor.ActorSystem
+import akka.stream.{ ClosedShape, KillSwitch }
+import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source }
+import akka.stream.{ ActorMaterializer, FanInShape2 }
+import java.io.{ ByteArrayOutputStream, File }
+import java.nio.file.Files
+import org.scalatest._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class BuildStageSpec extends FlatSpec with Matchers with DockerSuite{
+
+ implicit val materializer = ActorMaterializer()
+
+ val exec = new DockerExecutor
+
+ def withTmp[A](action: (File, ByteArrayOutputStream) => A): A = {
+ val dir = Files.createTempDirectory("crashbox-build-stage-test").toFile
+ val out = new ByteArrayOutputStream(1024)
+ try action(dir, out)
+ finally dir.delete()
+ }
+
+ "BuildStage" should "have a test!" in {
+ withTmp{ case (dir, out) =>
+ val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 100; exit 0")
+
+ val resultSink = Sink.foreach[Builder.BuildState](x => println(x))
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create(resultSink) {
+ implicit b => sink =>
+ import GraphDSL.Implicits._
+
+ val builder = b.add(new BuildStage(exec, _ => dir, _ => out))
+
+ val submissions = b.add(
+ Source.repeat(TaskId("123", 2) -> taskDef))
+ val cancellations = b.add(
+ Source.tick(10.seconds, 10.seconds, TaskId("0", 0)))
+
+ val ks = b.add(KillSwitch)
+
+ submissions ~> builder.in0
+ cancellations ~> builder.in1
+
+ builder.out ~> sink
+
+ ClosedShape
+ })
+
+ graph.run()
+ Thread.sleep(30000)
+ println("terminating")
+ Await.result(system.terminate(), 60.seconds)
+ println("teminated")
+ Thread.sleep(5000)
+ println("eot")
+ }
+
+ }
+}
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
index 2b6dce7..23cbced 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
@@ -1,5 +1,6 @@
package io.crashbox.ci
+import com.spotify.docker.client.DockerClient.ListContainersParam
import java.io.{ByteArrayOutputStream, File}
import java.nio.file.Files
@@ -9,6 +10,69 @@ import scala.concurrent.duration._
import akka.actor.ActorSystem
import org.scalatest._
+import scala.util.Random
+
+
+trait DockerSuite extends Suite with BeforeAndAfterAll { self =>
+
+ private val name = self.toString()
+
+ private def withTmp[A](action: File => A): A = {
+ val dir = Files.createTempDirectory("crashbox-docker-test-" + name).toFile
+ try action(dir)
+ finally dir.delete()
+ }
+
+ val baseImage = "debian:jessie-backports"
+ val dockerImage = "crashbox"
+ val dockerTimeout = 30.seconds
+ val dockerLabel = "test-" + Random.nextInt()
+
+ implicit val system = ActorSystem("crashbox-docker-test-" + name)
+ import system.dispatcher
+ val executor = new DockerExecutor {
+ override def label = dockerLabel
+ }
+
+ def buildImage(): Unit = {
+ println("Pulling base docker image for running docker tests")
+ executor.dockerClient.pull(baseImage)
+
+ withTmp { dir =>
+ println("Adapting base image for tests")
+ val modifications = s"""|FROM $baseImage
+ |RUN adduser crashbox
+ |USER crashbox
+ |""".stripMargin
+ Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes)
+ executor.dockerClient.build(dir.toPath, dockerImage)
+ }
+ }
+
+ def runningDockers: Seq[String] = {
+ val stale = executor.dockerClient
+ .listContainers(
+ ListContainersParam.withLabel("crashbox", dockerLabel)
+ ).asScala
+ stale.map(_.id())
+ }
+
+ override def beforeAll: Unit = {
+ buildImage()
+ }
+
+ override def afterAll: Unit = {
+ val running = runningDockers
+ running.foreach { id =>
+ executor.dockerClient.stopContainer(id, 0)
+ executor.dockerClient.removeContainer(id)
+ }
+ require(running.isEmpty, "Docker containers were left running after unit tests")
+ require(runningDockers.isEmpty, "Could not delete left over docker containers.")
+ }
+
+}
+
class DockerExecutorSpec
extends FlatSpec
@@ -52,13 +116,13 @@ class DockerExecutorSpec
def run[A](script: String)(tests: (Int, File, String) => A): A = withTmp {
dir =>
- val out = new ByteArrayOutputStream(1024)
- val awaitable = for (id <- exec.start(image, script, dir, out);
- status <- exec.result(id)) yield {
- status
- }
- val status = Await.result(awaitable, timeout)
- tests(status, dir, new String(out.toByteArray()).trim())
+ val out = new ByteArrayOutputStream(1024)
+ val awaitable = for (id <- exec.start(image, script, dir, out);
+ status <- exec.result(id)) yield {
+ status
+ }
+ val status = Await.result(awaitable, timeout)
+ tests(status, dir, new String(out.toByteArray()).trim())
}
"DockerExecutor" should "return expected exit codes" in {