path: root/crashboxd/src
diff options
Diffstat (limited to 'crashboxd/src')
5 files changed, 135 insertions, 297 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
index f77ce0a..1e0c771 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -1,6 +1,7 @@
import{ Actor, ActorLogging, ActorRef, ActorSystem }
import{ GraphStageLogic, InHandler, OutHandler, StageLogging }
import{ Attributes, FanInShape2, Inlet, Outlet }
import{ GraphStage }
@@ -19,195 +20,69 @@ case class TaskId(buildId: String, taskIdx: Int) {
override def toString = s"$buildId#$taskIdx"
-class BuildStage(
+class BuildSource(
+ taskId: TaskId,
+ taskDef: TaskDef,
executor: DockerExecutor,
- mkdir: TaskId => File,
- mkout: TaskId => OutputStream
-) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] {
+ mkdir: => File,
+ mkout: => OutputStream // TODO refactor this into a two-output stage
+) extends GraphStage[SourceShape[Builder.BuildState]] {
import Builder._
- val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions")
- val cancellations = Inlet[TaskId]("Builder.cancellations")
val states = Outlet[BuildState]("Builder.states")
- val shape = new FanInShape2(submissions, cancellations, states)
+ val shape = new SourceShape(states)
def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
implicit def ec = materializer.executionContext
- //import
- var runningTaskId: Option[TaskId] = None
- var runningExecutionId: Future[ExecutionId] =
- Future.failed(new RuntimeException("not started"))
+ lazy val instance: Future[ExecutionId] = executor.start(
+ taskDef.environment.asInstanceOf[DockerEnvironment].image,
+ taskDef.script,
+ mkdir,
+ mkout
+ )
- override def preStart(): Unit = {
-"Starting build stage")
- pull(cancellations)
- pull(submissions)
+ val changeState = getAsyncCallback[BuildState]{state =>
+"${state.taskId} transitioned to $state")
+ emit(states, state)
- setHandler(cancellations, new InHandler {
- override def onPush(): Unit = {
- val tid = grab(cancellations)
- if (runningTaskId == tid) {
- runningExecutionId.foreach { eid =>
- executor.stop(eid)
- }
- }
- pull(cancellations)
- }
- // don't finish stage when cancellations closes
- override def onUpstreamFinish() = {}
- })
- setHandler(submissions, new InHandler {
- val changeState = getAsyncCallback[BuildState]{state =>
-"${state.taskId} transitioned to $state")
- emit(states, state)
- }
- val pullNext = getAsyncCallback[Unit]{ _ =>
- if (isAvailable(states)) {
- pull(submissions)
- }
- }
- override def onPush(): Unit = {
- val (tid, tdef) = grab(submissions)
-"$tid new submission $tdef")
- val image = tdef.environment match {
- case DockerEnvironment(image) => image
- case env => sys.error("Unsupported environemnt")
- }
- changeState.invoke(TaskStarting(tid, tdef))
- runningTaskId = Some(tid)
- runningExecutionId = executor.start(
- image,
- tdef.script,
- mkdir(tid),
- mkout(tid)
- ).map{ eid =>
- changeState.invoke(TaskRunning(tid, eid))
- eid
- }
- runningExecutionId.flatMap { eid =>
- executor.result(eid)
- } onComplete { result =>
- result match {
- case Success(status) =>
- changeState.invoke(TaskFinished(tid, status))
- case Failure(err) =>
- changeState.invoke(TaskFailed(tid, err.toString))
- }
- pullNext.invoke(())
- }
- }
+ val asyncClose = getAsyncCallback[Unit]{_ =>
+ completeStage()
+ }
- override def onUpstreamFinish() = {
-"${submissions} finished, completing stage after final build")
- val callback = getAsyncCallback[Unit]{_ =>
- completeStage()
+ override def preStart() = {
+ val pipeline = for (
+ _ <- Future.unit;
+ _ = changeState.invoke(TaskStarting(taskId, taskDef));
+ eid <- instance;
+ _ = changeState.invoke(TaskRunning(taskId, eid));
+ status <- executor.result(eid)
+ ) yield status
+ pipeline onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(taskId, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(taskId, err.toString))
- runningExecutionId.onComplete { _ => callback.invoke(()) }
+ asyncClose.invoke(())
- })
- setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+ }
override def postStop() = {
- runningExecutionId foreach { eid =>
- log.error("kill")
- executor.stop(eid)
- }
+ instance.foreach{ eid => executor.stop(eid)}
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
-// class Builder(
-// listener: ActorRef,
-// exec: DockerExecutor,
-// mkdir: (BuildId, Int) => File,
-// mkout: (BuildId, Int) => OutputStream
-// ) extends Actor with ActorLogging {
-// import context._
-// import Builder._
-// val queue = Queue.empty[(BuildId, Int, BuildDef)]
-// val running = TrieMap.empty[(BuildId, Int), ExecutionId]
-// val Parallelism = 2
-// var slots = Parallelism
-// def receive = {
-// case Cancel(bid) =>
-// running.find(_._1._1 == bid).foreach { case (_, eid) =>
-// // stopping will cause the pipeline to fail an thus remove itself from
-// // the running map
-// exec.stop(eid)
-// }
-// case Done =>
-// slots += 1
-// listener ! Next
-// case Next if slots > 0 =>
-// slots -= 1
-// val (bid, tid, bdf) = queue.dequeue()
-// val tdf = bdf.tasks(tid)
-// val image = tdf.environment match {
-// case DockerEnvironment(image) => image
-// case env => {log.error(s"can't run $env"); ???}
-// }
-// listener ! TaskStarting(bid, tid, tdf)
-// val pipeline = for (
-// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid));
-// _ = listener ! TaskRunning(bid, tid, eid);
-// _ = running += (bid, tid) -> eid;
-// status <- exec.result(eid)
-// ) yield {
-// status
-// }
-// pipeline onComplete { res =>
-// running -= ((bid, tid))
-// self ! Done
-// res match {
-// case Success(status) => listener ! TaskFinished(bid, tid, status)
-// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString)
-// }
-// }
-// case Next => // no slots available, do nothing
-// }
-// override def postStop() = {
-// for ((_, eid) <- running) {
-// exec.stop(eid)
-// }
-// }
-// }
object Builder {
- //case class Submit(id: BuildId, build: BuildDef)
- //case class Cancel(id: BuildId)
- private case object Next
- private case object Done
sealed trait BuildState {
def taskId: TaskId
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
index 5abf7ae..1646777 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
@@ -1,6 +1,7 @@
import{File, OutputStream}
+import java.util.UUID
import scala.collection.JavaConverters._
import scala.concurrent.Future
@@ -12,6 +13,7 @@ import com.spotify.docker.client.DockerClient.{
+import scala.util.Random
import com.spotify.docker.client.LogStream
import com.spotify.docker.client.exceptions.ContainerNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
@@ -40,22 +42,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")(
- /*
- def makeImage() = {
- val returnedImageId =
- Paths.get("docker directory"), "test", new ProgressHandler() {
- override def progress(message: ProgressMessage) = {
- val imageId = message.buildImageId()
- message.buildImageId()
- if (imageId != null) {
- //imageIdFromMessage.set(imageId);
- }
- }
- })
- //
- }*/
- def label = "build"
+ private val label = "crashbox-executor" -> UUID.randomUUID().toString
def start(
image: String,
@@ -72,7 +59,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")(
val hostConfig = HostConfig.builder().binds(volume).build()
val containerConfig = ContainerConfig
- .labels(Map("crashbox" -> label).asJava)
+ .labels(Map(label).asJava)
.tty(true) // combine stdout and stderr into stdout
@@ -126,16 +113,19 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")(
- def reapDeadBuilds(): Unit = {
+ def clean(): Boolean = {
val stale = dockerClient
- ListContainersParam.withLabel("crashbox"),
- ListContainersParam.withStatusExited()
+ ListContainersParam.withLabel(label._1, label._2)
- stale.foreach { container =>
- log.warning(s"Removing stale container ${}")
- dockerClient.removeContainer(
+ stale.isEmpty || {
+ stale.foreach { container =>
+ log.warning(s"Removing stale container ${}")
+ dockerClient.killContainer(
+ dockerClient.removeContainer(
+ }
+ false
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
index a3a68b6..ec5ec56 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
@@ -1,6 +1,7 @@
import{ ClosedShape, KillSwitch }
import{ GraphDSL, RunnableGraph, Sink, Source }
import{ ActorMaterializer, FanInShape2 }
@@ -10,54 +11,47 @@ import org.scalatest._
import scala.concurrent.Await
import scala.concurrent.duration._
-class BuildStageSpec extends FlatSpec with Matchers with DockerSuite{
+class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ implicit val system = ActorSystem("crashboxd-buildstage")
implicit val materializer = ActorMaterializer()
+ val executor = new DockerExecutor
- val exec = new DockerExecutor
- def withTmp[A](action: (File, ByteArrayOutputStream) => A): A = {
- val dir = Files.createTempDirectory("crashbox-build-stage-test").toFile
- val out = new ByteArrayOutputStream(1024)
- try action(dir, out)
- finally dir.delete()
+ override def beforeAll(): Unit = {
+ DockerUtil.ensureImage(executor.dockerClient)
- "BuildStage" should "have a test!" in {
- withTmp{ case (dir, out) =>
- val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 100; exit 0")
- val resultSink = Sink.foreach[Builder.BuildState](x => println(x))
- val graph = RunnableGraph.fromGraph(GraphDSL.create(resultSink) {
- implicit b => sink =>
- import GraphDSL.Implicits._
- val builder = b.add(new BuildStage(exec, _ => dir, _ => out))
- val submissions = b.add(
- Source.repeat(TaskId("123", 2) -> taskDef))
- val cancellations = b.add(
- Source.tick(10.seconds, 10.seconds, TaskId("0", 0)))
- val ks = b.add(KillSwitch)
- submissions ~> builder.in0
- cancellations ~> builder.in1
- builder.out ~> sink
+ override def afterAll(): Unit = {
+ assert(executor.clean(), "Spawned containers were not removed")
+ system.terminate()
+ }
- ClosedShape
- })
- Thread.sleep(30000)
- println("terminating")
- Await.result(system.terminate(), 60.seconds)
- println("teminated")
- Thread.sleep(5000)
- println("eot")
+ "BuildStage" should "have a test!" in {
+ TestUtil.withTempFile{ dir =>
+ TestUtil.withTempStream{ out =>
+ val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 10; exit 0")
+ val resultSink = Sink.foreach[Builder.BuildState](x => println(x))
+ val stage = new BuildSource(
+ TaskId("build", 0),
+ taskDef,
+ executor,
+ dir,
+ out
+ )
+ val src = Source.fromGraph(stage)
+ //val done = src.toMat(resultSink)(Keep.right).run()
+ //executor.start("crashbox", "sleep 10000", dir, out)
+ Thread.sleep(1000)
+ assert(executor.clean())
+ //Await.ready(done, 30.seconds)
+ println("eot")
+ }
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
index 23cbced..5ec95cd 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
@@ -1,5 +1,6 @@
+import com.spotify.docker.client.DockerClient
import com.spotify.docker.client.DockerClient.ListContainersParam
import{ByteArrayOutputStream, File}
import java.nio.file.Files
@@ -13,71 +14,36 @@ import org.scalatest._
import scala.util.Random
-trait DockerSuite extends Suite with BeforeAndAfterAll { self =>
+object DockerUtil {
+ import TestUtil._
- private val name = self.toString()
+ val defaultImage = "crashbox"
- private def withTmp[A](action: File => A): A = {
- val dir = Files.createTempDirectory("crashbox-docker-test-" + name).toFile
- try action(dir)
- finally dir.delete()
- }
- val baseImage = "debian:jessie-backports"
- val dockerImage = "crashbox"
- val dockerTimeout = 30.seconds
- val dockerLabel = "test-" + Random.nextInt()
- implicit val system = ActorSystem("crashbox-docker-test-" + name)
- import system.dispatcher
- val executor = new DockerExecutor {
- override def label = dockerLabel
- }
- def buildImage(): Unit = {
+ def ensureImage(client: DockerClient): Unit = {
println("Pulling base docker image for running docker tests")
- executor.dockerClient.pull(baseImage)
+ val baseImage = "debian:jessie-backports"
+ client.pull(baseImage)
- withTmp { dir =>
+ withTempFile { dir =>
println("Adapting base image for tests")
val modifications = s"""|FROM $baseImage
|RUN adduser crashbox
|USER crashbox
Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes)
-, dockerImage)
- }
- }
- def runningDockers: Seq[String] = {
- val stale = executor.dockerClient
- .listContainers(
- ListContainersParam.withLabel("crashbox", dockerLabel)
- ).asScala
- }
- override def beforeAll: Unit = {
- buildImage()
- }
- override def afterAll: Unit = {
- val running = runningDockers
- running.foreach { id =>
- executor.dockerClient.stopContainer(id, 0)
- executor.dockerClient.removeContainer(id)
+, defaultImage)
- require(running.isEmpty, "Docker containers were left running after unit tests")
- require(runningDockers.isEmpty, "Could not delete left over docker containers.")
class DockerExecutorSpec
extends FlatSpec
with Matchers
- with BeforeAndAfterAll {
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach {
+ import TestUtil._
val image = "crashbox"
@@ -88,33 +54,22 @@ class DockerExecutorSpec
val exec = new DockerExecutor
override def beforeAll: Unit = {
- println("Pulling base docker image for running docker tests")
- val base = "debian:jessie-backports"
- exec.dockerClient.pull(base)
- withTmp { dir =>
- println("Adapting base image for tests")
- val modifications = s"""|FROM $base
- |RUN adduser crashbox
- |USER crashbox
- |""".stripMargin
- Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes)
-, image)
+ sys.addShutdownHook {
+ println("------------------- fooooo")
+ exec.clean()
+ DockerUtil.ensureImage(exec.dockerClient)
override def afterAll: Unit = {
- def withTmp[A](action: File => A): A = {
- val dir = Files.createTempDirectory("crashbox-docker-test").toFile
- try action(dir)
- finally dir.delete()
+ override def afterEach: Unit = {
+ assert(exec.clean(), "Spawned containers were not removed")
- def run[A](script: String)(tests: (Int, File, String) => A): A = withTmp {
+ def run[A](script: String)(tests: (Int, File, String) => A): A = withTempFile {
dir =>
val out = new ByteArrayOutputStream(1024)
val awaitable = for (id <- exec.start(image, script, dir, out);
@@ -167,8 +122,8 @@ class DockerExecutorSpec
- it should "allow cancellation" in {
- withTmp { dir =>
+ it should "allow cancellations" in {
+ withTempFile { dir =>
val script = "while true; do sleep 1; echo sleeping; done"
val out = new ByteArrayOutputStream(1024)
@@ -177,8 +132,6 @@ class DockerExecutorSpec
assert(res == 137)
- //TODO check if resoruces were cleaned up properly
Await.result(check, timeout)
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala
new file mode 100644
index 0000000..b5bacf2
--- /dev/null
+++ b/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala
@@ -0,0 +1,26 @@
+import{ ByteArrayOutputStream, File }
+import java.nio.file.Files
+object TestUtil {
+ def withTempFile[A](action: File => A): A = {
+ def rm(parent: File): Unit = if (parent.isDirectory) {
+ parent.listFiles.foreach{ child =>
+ rm(child)
+ }
+ }
+ val dir = Files.createTempDirectory("crashbox-test").toFile
+ try action(dir)
+ finally rm(dir)
+ }
+ def withTempStream[A](action: ByteArrayOutputStream => A, size: Int = 1024): A = {
+ val out = new ByteArrayOutputStream(size)
+ try action(out)
+ finally out.close()
+ }