From 382152098459a5783e3a794161ed2da2a321af37 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 9 Apr 2017 16:24:48 -0700 Subject: less leaky --- build.sbt | 2 +- .../src/main/scala/io/crashbox/ci/Builder.scala | 205 ++++----------------- .../main/scala/io/crashbox/ci/DockerExecutor.scala | 36 ++-- .../test/scala/io/crashbox/ci/BuildStageSpec.scala | 74 ++++---- .../scala/io/crashbox/ci/DockerExecutorSpec.scala | 91 +++------ .../src/test/scala/io/crashbox/ci/TestUtil.scala | 26 +++ 6 files changed, 136 insertions(+), 298 deletions(-) create mode 100644 crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala diff --git a/build.sbt b/build.sbt index b0931e4..f3c6b34 100644 --- a/build.sbt +++ b/build.sbt @@ -11,7 +11,7 @@ scalacOptions in ThisBuild ++= Seq( "-Xlint" ) fork in ThisBuild := true -cancelable in Global := true +//cancelable in Global := true lazy val root = (project in file(".")).aggregate(server) diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala index f77ce0a..1e0c771 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } +import akka.stream.SourceShape import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } import akka.stream.stage.{ GraphStage } @@ -19,195 +20,69 @@ case class TaskId(buildId: String, taskIdx: Int) { override def toString = s"$buildId#$taskIdx" } -class BuildStage( +class BuildSource( + taskId: TaskId, + taskDef: TaskDef, executor: DockerExecutor, - mkdir: TaskId => File, - mkout: TaskId => OutputStream -) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] { + mkdir: => File, + mkout: => OutputStream // TODO refactor this into a two-output stage +) extends GraphStage[SourceShape[Builder.BuildState]] { import Builder._ - val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions") - val cancellations = Inlet[TaskId]("Builder.cancellations") val states = Outlet[BuildState]("Builder.states") - val shape = new FanInShape2(submissions, cancellations, states) + val shape = new SourceShape(states) def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { implicit def ec = materializer.executionContext - //import scala.concurrent.ExecutionContext.Implicits.global - var runningTaskId: Option[TaskId] = None - var runningExecutionId: Future[ExecutionId] = - Future.failed(new RuntimeException("not started")) + lazy val instance: Future[ExecutionId] = executor.start( + taskDef.environment.asInstanceOf[DockerEnvironment].image, + taskDef.script, + mkdir, + mkout + ) - override def preStart(): Unit = { - log.info("Starting build stage") - pull(cancellations) - pull(submissions) + val changeState = getAsyncCallback[BuildState]{state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) } - setHandler(cancellations, new InHandler { - - override def onPush(): Unit = { - val tid = grab(cancellations) - if (runningTaskId == tid) { - runningExecutionId.foreach { eid => - executor.stop(eid) - } - } - pull(cancellations) - } - - // don't finish stage when cancellations closes - override def onUpstreamFinish() = {} - - }) - - setHandler(submissions, new InHandler { - - val changeState = getAsyncCallback[BuildState]{state => - log.info(s"${state.taskId} transitioned to $state") - emit(states, state) - } - val pullNext = getAsyncCallback[Unit]{ _ => - if (isAvailable(states)) { - pull(submissions) - } - } - - override def onPush(): Unit = { - val (tid, tdef) = grab(submissions) - log.info(s"$tid new submission $tdef") - - val image = tdef.environment match { - case DockerEnvironment(image) => image - case env => sys.error("Unsupported environemnt") - } - - changeState.invoke(TaskStarting(tid, tdef)) - runningTaskId = Some(tid) - runningExecutionId = executor.start( - image, - tdef.script, - mkdir(tid), - mkout(tid) - ).map{ eid => - changeState.invoke(TaskRunning(tid, eid)) - eid - } - - runningExecutionId.flatMap { eid => - executor.result(eid) - } onComplete { result => - result match { - case Success(status) => - changeState.invoke(TaskFinished(tid, status)) - case Failure(err) => - changeState.invoke(TaskFailed(tid, err.toString)) - } - pullNext.invoke(()) - } - } + val asyncClose = getAsyncCallback[Unit]{_ => + completeStage() + } - override def onUpstreamFinish() = { - log.info(s"${submissions} finished, completing stage after final build") - val callback = getAsyncCallback[Unit]{_ => - completeStage() + override def preStart() = { + val pipeline = for ( + _ <- Future.unit; + _ = changeState.invoke(TaskStarting(taskId, taskDef)); + eid <- instance; + _ = changeState.invoke(TaskRunning(taskId, eid)); + status <- executor.result(eid) + ) yield status + + pipeline onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(taskId, status)) + case Failure(err) => + changeState.invoke(TaskFailed(taskId, err.toString)) } - runningExecutionId.onComplete { _ => callback.invoke(()) } + asyncClose.invoke(()) } - - }) - - setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } override def postStop() = { - log.info("postStop") - runningExecutionId foreach { eid => - log.error("kill") - executor.stop(eid) - } + instance.foreach{ eid => executor.stop(eid)} } + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + } } -// class Builder( -// listener: ActorRef, -// exec: DockerExecutor, -// mkdir: (BuildId, Int) => File, -// mkout: (BuildId, Int) => OutputStream -// ) extends Actor with ActorLogging { -// import context._ -// import Builder._ - -// val queue = Queue.empty[(BuildId, Int, BuildDef)] -// val running = TrieMap.empty[(BuildId, Int), ExecutionId] - -// val Parallelism = 2 -// var slots = Parallelism - -// def receive = { - -// case Cancel(bid) => -// running.find(_._1._1 == bid).foreach { case (_, eid) => -// // stopping will cause the pipeline to fail an thus remove itself from -// // the running map -// exec.stop(eid) -// } - -// case Done => -// slots += 1 -// listener ! Next - -// case Next if slots > 0 => -// slots -= 1 -// val (bid, tid, bdf) = queue.dequeue() -// val tdf = bdf.tasks(tid) -// val image = tdf.environment match { -// case DockerEnvironment(image) => image -// case env => {log.error(s"can't run $env"); ???} -// } - -// listener ! TaskStarting(bid, tid, tdf) - -// val pipeline = for ( -// eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid)); -// _ = listener ! TaskRunning(bid, tid, eid); -// _ = running += (bid, tid) -> eid; -// status <- exec.result(eid) -// ) yield { -// status -// } -// pipeline onComplete { res => -// running -= ((bid, tid)) -// self ! Done -// res match { -// case Success(status) => listener ! TaskFinished(bid, tid, status) -// case Failure(err) => listener ! TaskFailed(bid, tid, err.toString) -// } -// } - -// case Next => // no slots available, do nothing - -// } - -// override def postStop() = { -// for ((_, eid) <- running) { -// exec.stop(eid) -// } -// } - -// } - object Builder { - //case class Submit(id: BuildId, build: BuildDef) - //case class Cancel(id: BuildId) - - private case object Next - private case object Done - sealed trait BuildState { def taskId: TaskId } diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala index 5abf7ae..1646777 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import java.io.{File, OutputStream} +import java.util.UUID import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -12,6 +13,7 @@ import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam } +import scala.util.Random import com.spotify.docker.client.LogStream import com.spotify.docker.client.exceptions.ContainerNotFoundException import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} @@ -40,22 +42,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( c } - /* - def makeImage() = { - val returnedImageId = dockerClient.build( - Paths.get("docker directory"), "test", new ProgressHandler() { - override def progress(message: ProgressMessage) = { - val imageId = message.buildImageId() - message.buildImageId() - if (imageId != null) { - //imageIdFromMessage.set(imageId); - } - } - }) - //dockerClient.build - }*/ - - def label = "build" + private val label = "crashbox-executor" -> UUID.randomUUID().toString def start( image: String, @@ -72,7 +59,7 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( val hostConfig = HostConfig.builder().binds(volume).build() val containerConfig = ContainerConfig .builder() - .labels(Map("crashbox" -> label).asJava) + .labels(Map(label).asJava) .hostConfig(hostConfig) .tty(true) // combine stdout and stderr into stdout .image(image) @@ -126,16 +113,19 @@ class DockerExecutor(uri: String = "unix:///run/docker.sock")( } } - def reapDeadBuilds(): Unit = { + def clean(): Boolean = { val stale = dockerClient .listContainers( - ListContainersParam.withLabel("crashbox"), - ListContainersParam.withStatusExited() + ListContainersParam.withLabel(label._1, label._2) ) .asScala - stale.foreach { container => - log.warning(s"Removing stale container ${container.id}") - dockerClient.removeContainer(container.id) + stale.isEmpty || { + stale.foreach { container => + log.warning(s"Removing stale container ${container.id}") + dockerClient.killContainer(container.id) + dockerClient.removeContainer(container.id) + } + false } } diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala index a3a68b6..ec5ec56 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala @@ -1,6 +1,7 @@ package io.crashbox.ci import akka.actor.ActorSystem +import akka.stream.scaladsl.Keep import akka.stream.{ ClosedShape, KillSwitch } import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source } import akka.stream.{ ActorMaterializer, FanInShape2 } @@ -10,54 +11,47 @@ import org.scalatest._ import scala.concurrent.Await import scala.concurrent.duration._ -class BuildStageSpec extends FlatSpec with Matchers with DockerSuite{ +class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit val system = ActorSystem("crashboxd-buildstage") implicit val materializer = ActorMaterializer() + val executor = new DockerExecutor - val exec = new DockerExecutor - def withTmp[A](action: (File, ByteArrayOutputStream) => A): A = { - val dir = Files.createTempDirectory("crashbox-build-stage-test").toFile - val out = new ByteArrayOutputStream(1024) - try action(dir, out) - finally dir.delete() + override def beforeAll(): Unit = { + DockerUtil.ensureImage(executor.dockerClient) } - "BuildStage" should "have a test!" in { - withTmp{ case (dir, out) => - val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 100; exit 0") - - val resultSink = Sink.foreach[Builder.BuildState](x => println(x)) - - val graph = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { - implicit b => sink => - import GraphDSL.Implicits._ - - val builder = b.add(new BuildStage(exec, _ => dir, _ => out)) - - val submissions = b.add( - Source.repeat(TaskId("123", 2) -> taskDef)) - val cancellations = b.add( - Source.tick(10.seconds, 10.seconds, TaskId("0", 0))) - - val ks = b.add(KillSwitch) - - submissions ~> builder.in0 - cancellations ~> builder.in1 - - builder.out ~> sink + override def afterAll(): Unit = { + assert(executor.clean(), "Spawned containers were not removed") + system.terminate() + } - ClosedShape - }) - graph.run() - Thread.sleep(30000) - println("terminating") - Await.result(system.terminate(), 60.seconds) - println("teminated") - Thread.sleep(5000) - println("eot") + "BuildStage" should "have a test!" in { + TestUtil.withTempFile{ dir => + TestUtil.withTempStream{ out => + + val taskDef = TaskDef(DockerEnvironment("crashbox"), "sleep 10; exit 0") + val resultSink = Sink.foreach[Builder.BuildState](x => println(x)) + + val stage = new BuildSource( + TaskId("build", 0), + taskDef, + executor, + dir, + out + ) + val src = Source.fromGraph(stage) + + //val done = src.toMat(resultSink)(Keep.right).run() + + //executor.start("crashbox", "sleep 10000", dir, out) + Thread.sleep(1000) + assert(executor.clean()) + //Await.ready(done, 30.seconds) + println("eot") + } } - } } diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala index 23cbced..5ec95cd 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala @@ -1,5 +1,6 @@ package io.crashbox.ci +import com.spotify.docker.client.DockerClient import com.spotify.docker.client.DockerClient.ListContainersParam import java.io.{ByteArrayOutputStream, File} import java.nio.file.Files @@ -13,71 +14,36 @@ import org.scalatest._ import scala.util.Random -trait DockerSuite extends Suite with BeforeAndAfterAll { self => +object DockerUtil { + import TestUtil._ - private val name = self.toString() + val defaultImage = "crashbox" - private def withTmp[A](action: File => A): A = { - val dir = Files.createTempDirectory("crashbox-docker-test-" + name).toFile - try action(dir) - finally dir.delete() - } - - val baseImage = "debian:jessie-backports" - val dockerImage = "crashbox" - val dockerTimeout = 30.seconds - val dockerLabel = "test-" + Random.nextInt() - - implicit val system = ActorSystem("crashbox-docker-test-" + name) - import system.dispatcher - val executor = new DockerExecutor { - override def label = dockerLabel - } - - def buildImage(): Unit = { + def ensureImage(client: DockerClient): Unit = { println("Pulling base docker image for running docker tests") - executor.dockerClient.pull(baseImage) + val baseImage = "debian:jessie-backports" + client.pull(baseImage) - withTmp { dir => + withTempFile { dir => println("Adapting base image for tests") val modifications = s"""|FROM $baseImage |RUN adduser crashbox |USER crashbox |""".stripMargin Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes) - executor.dockerClient.build(dir.toPath, dockerImage) - } - } - - def runningDockers: Seq[String] = { - val stale = executor.dockerClient - .listContainers( - ListContainersParam.withLabel("crashbox", dockerLabel) - ).asScala - stale.map(_.id()) - } - - override def beforeAll: Unit = { - buildImage() - } - - override def afterAll: Unit = { - val running = runningDockers - running.foreach { id => - executor.dockerClient.stopContainer(id, 0) - executor.dockerClient.removeContainer(id) + client.build(dir.toPath, defaultImage) } - require(running.isEmpty, "Docker containers were left running after unit tests") - require(runningDockers.isEmpty, "Could not delete left over docker containers.") } } - class DockerExecutorSpec extends FlatSpec with Matchers - with BeforeAndAfterAll { + with BeforeAndAfterAll + with BeforeAndAfterEach { + + import TestUtil._ val image = "crashbox" @@ -88,33 +54,22 @@ class DockerExecutorSpec val exec = new DockerExecutor override def beforeAll: Unit = { - println("Pulling base docker image for running docker tests") - val base = "debian:jessie-backports" - exec.dockerClient.pull(base) - - withTmp { dir => - println("Adapting base image for tests") - val modifications = s"""|FROM $base - |RUN adduser crashbox - |USER crashbox - |""".stripMargin - Files.write((new File(dir, "Dockerfile")).toPath, modifications.getBytes) - exec.dockerClient.build(dir.toPath, image) + sys.addShutdownHook { + println("------------------- fooooo") + exec.clean() } - + DockerUtil.ensureImage(exec.dockerClient) } override def afterAll: Unit = { system.terminate() } - def withTmp[A](action: File => A): A = { - val dir = Files.createTempDirectory("crashbox-docker-test").toFile - try action(dir) - finally dir.delete() + override def afterEach: Unit = { + assert(exec.clean(), "Spawned containers were not removed") } - def run[A](script: String)(tests: (Int, File, String) => A): A = withTmp { + def run[A](script: String)(tests: (Int, File, String) => A): A = withTempFile { dir => val out = new ByteArrayOutputStream(1024) val awaitable = for (id <- exec.start(image, script, dir, out); @@ -167,8 +122,8 @@ class DockerExecutorSpec } } - it should "allow cancellation" in { - withTmp { dir => + it should "allow cancellations" in { + withTempFile { dir => val script = "while true; do sleep 1; echo sleeping; done" val out = new ByteArrayOutputStream(1024) @@ -177,8 +132,6 @@ class DockerExecutorSpec assert(res == 137) } exec.stop(id) - //TODO check if resoruces were cleaned up properly - Await.result(check, timeout) } } diff --git a/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala new file mode 100644 index 0000000..b5bacf2 --- /dev/null +++ b/crashboxd/src/test/scala/io/crashbox/ci/TestUtil.scala @@ -0,0 +1,26 @@ +package io.crashbox.ci + +import java.io.{ ByteArrayOutputStream, File } +import java.nio.file.Files + + +object TestUtil { + + def withTempFile[A](action: File => A): A = { + def rm(parent: File): Unit = if (parent.isDirectory) { + parent.listFiles.foreach{ child => + rm(child) + } + } + val dir = Files.createTempDirectory("crashbox-test").toFile + try action(dir) + finally rm(dir) + } + + def withTempStream[A](action: ByteArrayOutputStream => A, size: Int = 1024): A = { + val out = new ByteArrayOutputStream(size) + try action(out) + finally out.close() + } + +} -- cgit v1.2.3