authorJakob Odersky <>2017-03-06 01:32:39 -0800
committerJakob Odersky <>2017-03-06 01:32:39 -0800
commit374ac0ce8c10e9063c263ecf9c708f9ee6772973 (patch)
parentf9cc5e3f8478eb9f19cca093b8019e579f9c87e6 (diff)
Basic, single-task builder
-rw-r--r--crashbox-server/build.sbt (renamed from crashbox-worker/build.sbt)3
-rw-r--r--crashbox-server/src/test/scala/io/crashbox/ci/SourceSpec.scala (renamed from crashbox-worker/src/test/scala/io/crashbox/ci/source/GitFetchersSpec.scala)5
-rw-r--r--crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala (renamed from crashbox-worker/src/test/scala/io/crashbox/ci/TestUtil.scala)0
21 files changed, 432 insertions, 141 deletions
diff --git a/build.sbt b/build.sbt
index aa14fd3..41345bb 100644
--- a/build.sbt
+++ b/build.sbt
@@ -11,8 +11,6 @@ scalacOptions in ThisBuild ++= Seq(
fork in ThisBuild := true
cancelable in Global := true
-lazy val root = (project in file(".")).aggregate(http, worker)
+lazy val root = (project in file(".")).aggregate(server)
-lazy val http = (project in file("crashbox-http"))
-lazy val worker = (project in file("crashbox-worker"))
+lazy val server = (project in file("crashbox-server"))
diff --git a/crashbox-worker/build.sbt b/crashbox-server/build.sbt
index 2b9cf08..4b37caf 100644
--- a/crashbox-worker/build.sbt
+++ b/crashbox-server/build.sbt
@@ -1,7 +1,10 @@
import crashbox.Dependencies
libraryDependencies ++= Seq(
+ Dependencies.akkaActor,
+ Dependencies.akkaStream,
+ Dependencies.dockerClient,
Dependencies.scalatest % Test
diff --git a/crashbox-server/src/main/resources/reference.conf b/crashbox-server/src/main/resources/reference.conf
new file mode 100644
index 0000000..a156b88
--- /dev/null
+++ b/crashbox-server/src/main/resources/reference.conf
@@ -0,0 +1,25 @@
+crashbox {
+ blocking-dispatcher {
+ type = Dispatcher
+ executor = "thread-pool-executor"
+ thread-pool-executor {
+ fixed-pool-size = 16
+ }
+ throughput = 100
+ }
+ streams {
+ directory = "streams"
+ }
+akka {
+ #loglevel = "DEBUG"
+ actor {
+ guardian-supervisor-strategy = ""
+ }
+} \ No newline at end of file
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala
new file mode 100644
index 0000000..7e55640
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala
@@ -0,0 +1,100 @@
+import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam }
+import com.spotify.docker.client.LogStream
+import com.spotify.docker.client.exceptions.ContainerNotFoundException
+import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig, LogConfig }
+import com.spotify.docker.client.messages.HostConfig.Bind
+import{ File, OutputStream }
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.collection.JavaConverters._
+import com.spotify.docker.client.DefaultDockerClient
+trait Builders { core: Core =>
+ val dockerClient = DefaultDockerClient.builder().uri("unix:///run/docker.sock").build()
+ core.system.registerOnTermination{
+ dockerClient.close()
+ }
+ def containerUser = "crashbox"
+ def containerWorkDirectory = "/home/crashbox"
+ def containerKillTimeout = 10.seconds
+ case class ContainerId(id: String) {
+ override def toString = id
+ }
+ def startBuild(
+ image: String,
+ script: String,
+ dir: File,
+ out: OutputStream
+ ): Future[ContainerId] = Future {
+ val volume = Bind.builder().from(dir.getAbsolutePath).to(containerWorkDirectory).build()
+ val hostConfig = HostConfig.builder().binds(volume).build()
+ val containerConfig = ContainerConfig.builder()
+ .labels(Map("crashbox" -> "build").asJava)
+ .hostConfig(hostConfig)
+ .tty(true) // combine stdout and stderr into stdout
+ .image(image)
+ .user(containerUser)
+ .workingDir(containerWorkDirectory)
+ .entrypoint("/bin/sh", "-c")
+ .cmd(script)
+ .build()
+ val container = dockerClient.createContainer(containerConfig).id
+ log.debug(s"Starting container $container")
+ dockerClient.startContainer(container)
+ log.debug(s"Attaching log stream of container $container")
+ blockingDispatcher execute new Runnable {
+ override def run() = {
+ var stream: LogStream = null
+ try {
+ stream = dockerClient.attachContainer(
+ container,
+ AttachParameter.LOGS,
+ AttachParameter.STDOUT,
+ AttachParameter.STREAM
+ )
+ stream.attach(out, null, true)
+ } finally {
+ if (stream != null) stream.close()
+ }
+ }
+ }
+ ContainerId(container)
+ }(blockingDispatcher)
+ def waitBuild(id: ContainerId): Future[Int] = Future {
+ log.debug(s"Waiting for container $id to exit")
+ val res: Int = dockerClient.waitContainer(
+ cancelBuild(id)
+ res
+ }(blockingDispatcher)
+ def cancelBuild(id: ContainerId): Unit = {
+ log.debug(s"Stopping container $id")
+ try {
+ dockerClient.stopContainer(, containerKillTimeout.toUnit(SECONDS).toInt)
+ dockerClient.removeContainer(
+ } catch {
+ case _: ContainerNotFoundException => // build already cancelled
+ }
+ }
+ def reapDeadBuilds(): Unit = {
+ val stale = dockerClient.listContainers(
+ ListContainersParam.withLabel("crashbox"),
+ ListContainersParam.withStatusExited()
+ ).asScala
+ stale.foreach { container =>
+ dockerClient.removeContainer(
+ }
+ }
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
new file mode 100644
index 0000000..8342293
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
@@ -0,0 +1,22 @@
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ Await, ExecutionContext }
+trait Core {
+ implicit val system: ActorSystem = ActorSystem("crashbox")
+ implicit val executionContext: ExecutionContext = system.dispatcher
+ val blockingDispatcher: ExecutionContext = system.dispatchers.lookup("crashbox.blocking-dispatcher")
+ def log = system.log
+ def config = system.settings.config
+ sys.addShutdownHook {
+"Shutting down systm")
+ Await.ready(system.terminate(), Duration.Inf)
+ println("shutdown")
+ }
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
new file mode 100644
index 0000000..5b58875
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
@@ -0,0 +1,26 @@
+object Main extends Core
+ with Schedulers
+ with Builders
+ with Parsers
+ with Source
+ with StreamStore {
+ def main(args: Array[String]): Unit = {
+ reapDeadBuilds()
+ start(
+ "random_build",
+ new URL("file:///home/jodersky/tmp/dummy"),
+ () => saveStream("random_build"),
+ state => println(state)
+ )
+ Thread.sleep(15000)
+ System.exit(0)
+ }
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala
new file mode 100644
index 0000000..6f43380
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala
@@ -0,0 +1,36 @@
+import java.nio.file.Files
+import scala.collection.JavaConverters._
+trait Parsers {
+ def defaultImage = "crashbox/default"
+ case class BuildDef(
+ image: String,
+ script: String
+ )
+ case class ParseError(message: String)
+ def parseBuild(workdir: File): Either[BuildDef, ParseError] = {
+ val file = new File(workdir, ".crashbox.txt")
+ if (!file.exists()) {
+ return Right(ParseError("No build configuration file .crashbox.txt found."))
+ }
+ val lines = Files.readAllLines(file.toPath)
+ val Pattern = """(\w+)\s*:\s*(.+)""".r
+ val image = lines.collectFirst{case Pattern("image", s) => s}.getOrElse(defaultImage)
+ val script = lines.collectFirst{case Pattern("script", s) => s}
+ script match {
+ case Some(s) => Left(BuildDef(image, s))
+ case None => Right(ParseError("No build script defined in configuration."))
+ }
+ }
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
new file mode 100644
index 0000000..c860932
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
@@ -0,0 +1,156 @@
+import{ Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated }
+import{ Attributes, Outlet, SourceShape }
+import{ File, OutputStream }
+import java.nio.file.Files
+import java.util.Base64
+import scala.collection.mutable.HashMap
+import scala.concurrent.{ Await, Future }
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+import scala.util.{ Failure, Success }
+trait Schedulers extends { self: Core with Source with Builders with Parsers =>
+ private def newTempDir: File = Files.createTempDirectory("crashbox-run").toFile()
+ sealed trait BuildState
+ case class Cloning(url: URL) extends BuildState
+ case class Parsing(dir: File) extends BuildState
+ case class Starting(dir: File, buildDef: BuildDef) extends BuildState
+ case class Running(id: ContainerId) extends BuildState
+ sealed trait EndBuildState extends BuildState
+ case class Finished(status: Int) extends EndBuildState
+ case class Failed(message: String) extends EndBuildState
+ class BuildManager(
+ url: URL,
+ openOut: () => OutputStream,
+ update: BuildState => Unit
+ ) extends Actor with ActorLogging {
+ var buildDir: Option[File] = None
+ var out: Option[OutputStream] = None
+ var containerId: Option[ContainerId] = None
+ override def postStop() = {
+ containerId foreach { cancelBuild(_) }
+ out foreach { _.close() }
+ buildDir foreach { _.delete() }
+ }
+ override def preStart() = {
+"Started build manager for $url")
+ self ! Cloning(url)
+ }
+ override def receive: Receive = {
+ case state@Cloning(url) =>
+ log.debug("Update build state: cloning")
+ update(state)
+ fetchSource(url, newTempDir) onComplete {
+ case Success(dir) =>
+ self ! Parsing(dir)
+ case Failure(err) =>
+ self ! Failed(s"Error fetching source from $url")
+ }
+ case state@Parsing(src) =>
+ log.debug("Update build state: parsing")
+ update(state)
+ buildDir = Some(src)
+ parseBuild(src) match {
+ case Left(buildDef) =>
+ self ! Starting(src, buildDef)
+ case Right(err) =>
+ self ! Failed(s"Failed to parse build $err")
+ }
+ case state@Starting(src, bd) =>
+ log.debug("Update build state: starting")
+ update(state)
+ val so = openOut()
+ out = Some(so)
+ startBuild(bd.image, bd.script, src, so) onComplete {
+ case Success(id) =>
+ self ! Running(id)
+ case Failure(err) =>
+ self ! Failed(s"Failed to start build $err")
+ }
+ case state@Running(id) =>
+ log.debug("Update build state: running")
+ update(state)
+ containerId = Some(id)
+ waitBuild(id) onComplete {
+ case Success(status) =>
+ self ! Finished(status)
+ case Failure(err) =>
+ self ! Failed(s"Error waiting for build to complete")
+ }
+ case state@Finished(status) =>
+ log.debug("Update build state: finished")
+ update(state)
+ context stop self
+ case state@Failed(message) =>
+ log.debug("Update build state: failed")
+ update(state)
+ context stop self
+ }
+ }
+ object BuildManager {
+ def apply(buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit) =
+ Props(new BuildManager(url, out, update))
+ }
+ private sealed trait SchedulerCommand
+ private case class ScheduleBuild(
+ buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit
+ ) extends SchedulerCommand
+ class Scheduler extends Actor {
+ val runningBuilds = new HashMap[String, ActorRef]
+ override def receive = {
+ case sb: ScheduleBuild =>
+ runningBuilds.get(sb.buildId) match {
+ case Some(_) => //already running
+ case None =>
+ val buildManager = context.actorOf(BuildManager(
+ sb.buildId, sb.url, sb.out, sb.update), s"build-${sb.buildId}")
+ context watch buildManager
+ runningBuilds += sb.buildId -> buildManager
+ }
+ case Terminated(buildManager) =>
+ //TODO use a more efficient data structure
+ runningBuilds.find(_._2 == buildManager).foreach {
+ runningBuilds -= _._1
+ }
+ }
+ }
+ private val scheduler = system.actorOf(Props(new Scheduler()), "crashbox-scheduler")
+ def start(
+ buildId: String,
+ url: URL,
+ out: () => OutputStream,
+ update: BuildState => Unit
+ ): Unit = {
+ scheduler ! ScheduleBuild(buildId, url, out, update)
+ }
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala
new file mode 100644
index 0000000..720b809
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala
@@ -0,0 +1,16 @@
+import org.eclipse.jgit.api.Git
+import scala.concurrent.Future
+trait Source { self: Core =>
+ def fetchSource(from: URL, to: File): Future[File] = Future {
+ log.debug(s"Cloning git repo from $from to $to")
+ Git.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call()
+ to
+ }(blockingDispatcher)
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
new file mode 100644
index 0000000..94f51d0
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
@@ -0,0 +1,7 @@
+trait StateStore {
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
new file mode 100644
index 0000000..5fd3769
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
@@ -0,0 +1,29 @@
+import{ File, FileInputStream, FileOutputStream, InputStream, OutputStream }
+trait StreamStore { self: Core =>
+ val streamsDirectory: File = new File(config.getString(""))
+ private def logFile(id: String): File = {
+ val bytes = MessageDigest.getInstance("SHA-256").digest(id.getBytes)
+ val str ={byte => Integer.toString((byte & 0xff) + 0x100, 16)}.mkString
+ val (head, tail) = str.splitAt(2)
+ new File(streamsDirectory, s"$head/$tail")
+ }
+ def saveStream(id: String): OutputStream = {
+ val file = logFile(id)
+ file.getParentFile.mkdirs()
+ file.createNewFile()
+ file.setWritable(true)
+ new FileOutputStream(file)
+ }
+ def readStream(id: String): InputStream = {
+ new FileInputStream(logFile(id))
+ }
diff --git a/crashbox-worker/src/test/scala/io/crashbox/ci/source/GitFetchersSpec.scala b/crashbox-server/src/test/scala/io/crashbox/ci/SourceSpec.scala
index a0886a5..9bef01d 100644
--- a/crashbox-worker/src/test/scala/io/crashbox/ci/source/GitFetchersSpec.scala
+++ b/crashbox-server/src/test/scala/io/crashbox/ci/SourceSpec.scala
@@ -1,5 +1,4 @@
-package source
import java.nio.file.Files
@@ -9,7 +8,7 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest._
-class GitFetchersSpec extends FlatSpec with Matchers with GitFetchers {
+class SourceSpec extends FlatSpec with Matchers with Source with Core {
val Timeout = 10.seconds
@@ -27,7 +26,7 @@ class GitFetchersSpec extends FlatSpec with Matchers with GitFetchers {
TestUtil.withTempDir{ remote =>
TestUtil.withTempDir { local =>
- val cloned = Await.result(fetch(remote.toURI().toURL(), local), Timeout)
+ val cloned = Await.result(fetchSource(remote.toURI().toURL(), local), Timeout)
assert(cloned.listFiles().length == 3)
diff --git a/crashbox-worker/src/test/scala/io/crashbox/ci/TestUtil.scala b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala
index eb177f8..eb177f8 100644
--- a/crashbox-worker/src/test/scala/io/crashbox/ci/TestUtil.scala
+++ b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala
diff --git a/crashbox-worker/src/main/scala/io/crashbox/ci/build/Builders.scala b/crashbox-worker/src/main/scala/io/crashbox/ci/build/Builders.scala
deleted file mode 100644
index 14b43fe..0000000
--- a/crashbox-worker/src/main/scala/io/crashbox/ci/build/Builders.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package build
-import{ File, OutputStream }
-import scala.concurrent.Future
-trait Builders {
- def build(
- workdir: File,
- stdout: OutputStream,
- stderr: OutputStream
- ): Future[Int]
diff --git a/crashbox-worker/src/main/scala/io/crashbox/ci/build/ShellBuilders.scala b/crashbox-worker/src/main/scala/io/crashbox/ci/build/ShellBuilders.scala
deleted file mode 100644
index 71bd683..0000000
--- a/crashbox-worker/src/main/scala/io/crashbox/ci/build/ShellBuilders.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package build
-import scala.concurrent.Future
-import scala.sys.process.{ Process, _ }
-import{ File, InputStream }
-import scala.concurrent.Future
-trait ShellBuilders extends Builders {
- def pipe(is: InputStream, os: OutputStream) = {
- var n = 0
- val buffer = new Array[Byte](1024);
- while ({n =; n > -1}) {
- os.write(buffer, 0, n);
- }
- os.close()
- }
- @deprecated("use git-specific execution context", "todo")
- implicit private val ec =
- override def build(workdir: File, stdout: OutputStream, stderr: OutputStream): Future[Int] = {
- def ignore(in: OutputStream): Unit = ()
- val io = new ProcessIO(ignore, pipe(_, stdout), pipe(_, stderr))
- Future{
- Process("./crashbox", Some(workdir)).run(io).exitValue()
- }
- }
diff --git a/crashbox-worker/src/main/scala/io/crashbox/ci/source/Fetchers.scala b/crashbox-worker/src/main/scala/io/crashbox/ci/source/Fetchers.scala
deleted file mode 100644
index 0fd4f38..0000000
--- a/crashbox-worker/src/main/scala/io/crashbox/ci/source/Fetchers.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package source
-import scala.concurrent.Future
-trait Fetchers {
- def fetch(from: URL, to: File): Future[File]
diff --git a/crashbox-worker/src/main/scala/io/crashbox/ci/source/GitFetchers.scala b/crashbox-worker/src/main/scala/io/crashbox/ci/source/GitFetchers.scala
deleted file mode 100644
index 9b603e2..0000000
--- a/crashbox-worker/src/main/scala/io/crashbox/ci/source/GitFetchers.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package source
-import org.eclipse.jgit.api.Git
-import scala.concurrent.Future
-trait GitFetchers extends Fetchers {
- @deprecated("use git-specific execution context", "todo")
- implicit private val ec =
- def fetch(from: URL, to: File): Future[File] = Future {
- Git.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call()
- to
- }
diff --git a/crashbox-worker/src/test/scala/io/crashbox/ci/build/ShellBuildersSpec.scala b/crashbox-worker/src/test/scala/io/crashbox/ci/build/ShellBuildersSpec.scala
deleted file mode 100644
index 9419804..0000000
--- a/crashbox-worker/src/test/scala/io/crashbox/ci/build/ShellBuildersSpec.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-package build
-import{ BufferedOutputStream, ByteArrayOutputStream, File }
-import java.nio.file.Files
-import org.eclipse.jgit.util.Paths
-import scala.concurrent.duration._
-import scala.concurrent.Await
-import org.scalatest._
-class ShellBuildersSpec extends FlatSpec with Matchers with ShellBuilders {
- val Timeout = 10.seconds
- def runScript(script: String): (Int, String, String) = {
- val stdout = new ByteArrayOutputStream(4096)
- val stderr = new ByteArrayOutputStream(4096)
- val result = TestUtil.withTempDir{ dir =>
- val exec = new File(dir, "crashbox")
- exec.createNewFile()
- Files.write(exec.toPath, script.getBytes)
- exec.setExecutable(true)
- Await.result(build(dir, stdout, stderr), Timeout)
- }
- stdout.close()
- stderr.close()
- (result, new String(stdout.toByteArray(), "utf-8"), new String(stderr.toByteArray(), "utf-8"))
- }
- "ShellBuilders" should "run a shell script" in {
- val script = """|#!/bin/sh
- |echo "hello world"
- |echo "foo" >&2
- |""".stripMargin
- val (res, stdout, stderr) = runScript(script: String)
- assert(res == 0)
- assert(stdout == "hello world\n")
- assert(stderr == "foo\n")
- }
- it should "report a failed script" in {
- val script = """|#!/bin/sh
- |exit 1
- |""".stripMargin
- val (res, _, _) = runScript(script: String)
- assert(res == 1)
- }
diff --git a/images/Dockerfile.default b/images/Dockerfile.default
new file mode 100644
index 0000000..43b9a3d
--- /dev/null
+++ b/images/Dockerfile.default
@@ -0,0 +1,3 @@
+FROM debian:jessie-backports
+RUN adduser crashbox
diff --git a/images/Makefile b/images/Makefile
new file mode 100644
index 0000000..a722d4a
--- /dev/null
+++ b/images/Makefile
@@ -0,0 +1,2 @@
+%: Dockerfile.%
+ docker build --tag "crashbox/$@" --file $< .
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 97b5742..3d1de67 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -7,10 +7,13 @@ object Dependencies {
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.4.17"
val akkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % "10.0.4"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.0.4"
+ val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.4.17"
val jgitServer = "org.eclipse.jgit" % "org.eclipse.jgit.http.server" % ""
val jgitArchive = "org.eclipse.jgit" % "org.eclipse.jgit.archive" % ""
+ val dockerClient = "com.spotify" % "docker-client" % "8.1.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.0.1"