summaryrefslogtreecommitdiff
path: root/crashbox-server/src/main/scala/io
diff options
context:
space:
mode:
Diffstat (limited to 'crashbox-server/src/main/scala/io')
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala100
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Core.scala22
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Main.scala26
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala36
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala156
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Source.scala16
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala7
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala29
8 files changed, 392 insertions, 0 deletions
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala
new file mode 100644
index 0000000..7e55640
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala
@@ -0,0 +1,100 @@
+package io.crashbox.ci
+
+import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam }
+import com.spotify.docker.client.LogStream
+import com.spotify.docker.client.exceptions.ContainerNotFoundException
+import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig, LogConfig }
+import com.spotify.docker.client.messages.HostConfig.Bind
+import java.io.{ File, OutputStream }
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.collection.JavaConverters._
+import com.spotify.docker.client.DefaultDockerClient
+
+trait Builders { core: Core =>
+
+ val dockerClient = DefaultDockerClient.builder().uri("unix:///run/docker.sock").build()
+
+ core.system.registerOnTermination{
+ dockerClient.close()
+ }
+
+ def containerUser = "crashbox"
+ def containerWorkDirectory = "/home/crashbox"
+ def containerKillTimeout = 10.seconds
+
+ case class ContainerId(id: String) {
+ override def toString = id
+ }
+
+ def startBuild(
+ image: String,
+ script: String,
+ dir: File,
+ out: OutputStream
+ ): Future[ContainerId] = Future {
+ val volume = Bind.builder().from(dir.getAbsolutePath).to(containerWorkDirectory).build()
+ val hostConfig = HostConfig.builder().binds(volume).build()
+ val containerConfig = ContainerConfig.builder()
+ .labels(Map("crashbox" -> "build").asJava)
+ .hostConfig(hostConfig)
+ .tty(true) // combine stdout and stderr into stdout
+ .image(image)
+ .user(containerUser)
+ .workingDir(containerWorkDirectory)
+ .entrypoint("/bin/sh", "-c")
+ .cmd(script)
+ .build()
+ val container = dockerClient.createContainer(containerConfig).id
+
+ log.debug(s"Starting container $container")
+ dockerClient.startContainer(container)
+
+ log.debug(s"Attaching log stream of container $container")
+ blockingDispatcher execute new Runnable {
+ override def run() = {
+ var stream: LogStream = null
+ try {
+ stream = dockerClient.attachContainer(
+ container,
+ AttachParameter.LOGS,
+ AttachParameter.STDOUT,
+ AttachParameter.STREAM
+ )
+ stream.attach(out, null, true)
+ } finally {
+ if (stream != null) stream.close()
+ }
+ }
+ }
+ ContainerId(container)
+ }(blockingDispatcher)
+
+ def waitBuild(id: ContainerId): Future[Int] = Future {
+ log.debug(s"Waiting for container $id to exit")
+ val res: Int = dockerClient.waitContainer(id.id).statusCode()
+ cancelBuild(id)
+ res
+ }(blockingDispatcher)
+
+ def cancelBuild(id: ContainerId): Unit = {
+ log.debug(s"Stopping container $id")
+ try {
+ dockerClient.stopContainer(id.id, containerKillTimeout.toUnit(SECONDS).toInt)
+ dockerClient.removeContainer(id.id)
+ } catch {
+ case _: ContainerNotFoundException => // build already cancelled
+ }
+ }
+
+ def reapDeadBuilds(): Unit = {
+ val stale = dockerClient.listContainers(
+ ListContainersParam.withLabel("crashbox"),
+ ListContainersParam.withStatusExited()
+ ).asScala
+ stale.foreach { container =>
+ dockerClient.removeContainer(container.id())
+ }
+ }
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
new file mode 100644
index 0000000..8342293
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
@@ -0,0 +1,22 @@
+package io.crashbox.ci
+
+import akka.actor.ActorSystem
+import scala.concurrent.duration.Duration
+import scala.concurrent.{ Await, ExecutionContext }
+
+trait Core {
+
+ implicit val system: ActorSystem = ActorSystem("crashbox")
+ implicit val executionContext: ExecutionContext = system.dispatcher
+ val blockingDispatcher: ExecutionContext = system.dispatchers.lookup("crashbox.blocking-dispatcher")
+
+ def log = system.log
+ def config = system.settings.config
+
+ sys.addShutdownHook {
+ log.info("Shutting down systm")
+ Await.ready(system.terminate(), Duration.Inf)
+ println("shutdown")
+ }
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
new file mode 100644
index 0000000..5b58875
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
@@ -0,0 +1,26 @@
+package io.crashbox.ci
+
+import java.net.URL
+
+
+object Main extends Core
+ with Schedulers
+ with Builders
+ with Parsers
+ with Source
+ with StreamStore {
+
+ def main(args: Array[String]): Unit = {
+ reapDeadBuilds()
+
+ start(
+ "random_build",
+ new URL("file:///home/jodersky/tmp/dummy"),
+ () => saveStream("random_build"),
+ state => println(state)
+ )
+ Thread.sleep(15000)
+ System.exit(0)
+ }
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala
new file mode 100644
index 0000000..6f43380
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala
@@ -0,0 +1,36 @@
+package io.crashbox.ci
+
+import java.io.File
+import java.nio.file.Files
+import scala.collection.JavaConverters._
+
+trait Parsers {
+
+ def defaultImage = "crashbox/default"
+
+ case class BuildDef(
+ image: String,
+ script: String
+ )
+
+ case class ParseError(message: String)
+
+ def parseBuild(workdir: File): Either[BuildDef, ParseError] = {
+ val file = new File(workdir, ".crashbox.txt")
+ if (!file.exists()) {
+ return Right(ParseError("No build configuration file .crashbox.txt found."))
+ }
+
+ val lines = Files.readAllLines(file.toPath).asScala.map(_.trim)
+
+ val Pattern = """(\w+)\s*:\s*(.+)""".r
+
+ val image = lines.collectFirst{case Pattern("image", s) => s}.getOrElse(defaultImage)
+ val script = lines.collectFirst{case Pattern("script", s) => s}
+
+ script match {
+ case Some(s) => Left(BuildDef(image, s))
+ case None => Right(ParseError("No build script defined in configuration."))
+ }
+ }
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
new file mode 100644
index 0000000..c860932
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
@@ -0,0 +1,156 @@
+package io.crashbox.ci
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated }
+import akka.stream.stage.GraphStageLogic
+import akka.stream.{ Attributes, Outlet, SourceShape }
+import akka.stream.stage.GraphStage
+import java.io.{ File, OutputStream }
+import java.net.URL
+import java.nio.file.Files
+import java.util.Base64
+import scala.collection.mutable.HashMap
+import scala.concurrent.{ Await, Future }
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+import akka.actor.SupervisorStrategy._
+import scala.util.{ Failure, Success }
+
+
+trait Schedulers extends { self: Core with Source with Builders with Parsers =>
+
+ private def newTempDir: File = Files.createTempDirectory("crashbox-run").toFile()
+
+ sealed trait BuildState
+ case class Cloning(url: URL) extends BuildState
+ case class Parsing(dir: File) extends BuildState
+ case class Starting(dir: File, buildDef: BuildDef) extends BuildState
+ case class Running(id: ContainerId) extends BuildState
+
+ sealed trait EndBuildState extends BuildState
+ case class Finished(status: Int) extends EndBuildState
+ case class Failed(message: String) extends EndBuildState
+
+ class BuildManager(
+ url: URL,
+ openOut: () => OutputStream,
+ update: BuildState => Unit
+ ) extends Actor with ActorLogging {
+
+ var buildDir: Option[File] = None
+ var out: Option[OutputStream] = None
+ var containerId: Option[ContainerId] = None
+
+ override def postStop() = {
+ containerId foreach { cancelBuild(_) }
+ out foreach { _.close() }
+ buildDir foreach { _.delete() }
+ }
+
+ override def preStart() = {
+ log.info(s"Started build manager for $url")
+ self ! Cloning(url)
+ }
+
+ override def receive: Receive = {
+
+ case state@Cloning(url) =>
+ log.debug("Update build state: cloning")
+ update(state)
+ fetchSource(url, newTempDir) onComplete {
+ case Success(dir) =>
+ self ! Parsing(dir)
+ case Failure(err) =>
+ self ! Failed(s"Error fetching source from $url")
+ }
+
+ case state@Parsing(src) =>
+ log.debug("Update build state: parsing")
+ update(state)
+ buildDir = Some(src)
+ parseBuild(src) match {
+ case Left(buildDef) =>
+ self ! Starting(src, buildDef)
+ case Right(err) =>
+ self ! Failed(s"Failed to parse build $err")
+ }
+
+ case state@Starting(src, bd) =>
+ log.debug("Update build state: starting")
+ update(state)
+ val so = openOut()
+ out = Some(so)
+ startBuild(bd.image, bd.script, src, so) onComplete {
+ case Success(id) =>
+ self ! Running(id)
+ case Failure(err) =>
+ self ! Failed(s"Failed to start build $err")
+ }
+
+ case state@Running(id) =>
+ log.debug("Update build state: running")
+ update(state)
+ containerId = Some(id)
+ waitBuild(id) onComplete {
+ case Success(status) =>
+ self ! Finished(status)
+ case Failure(err) =>
+ self ! Failed(s"Error waiting for build to complete")
+ }
+
+ case state@Finished(status) =>
+ log.debug("Update build state: finished")
+ update(state)
+ context stop self
+
+ case state@Failed(message) =>
+ log.debug("Update build state: failed")
+ update(state)
+ context stop self
+ }
+ }
+ object BuildManager {
+ def apply(buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit) =
+ Props(new BuildManager(url, out, update))
+ }
+
+ private sealed trait SchedulerCommand
+ private case class ScheduleBuild(
+ buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit
+ ) extends SchedulerCommand
+
+ class Scheduler extends Actor {
+
+ val runningBuilds = new HashMap[String, ActorRef]
+
+ override def receive = {
+
+ case sb: ScheduleBuild =>
+ runningBuilds.get(sb.buildId) match {
+ case Some(_) => //already running
+ case None =>
+ val buildManager = context.actorOf(BuildManager(
+ sb.buildId, sb.url, sb.out, sb.update), s"build-${sb.buildId}")
+ context watch buildManager
+ runningBuilds += sb.buildId -> buildManager
+ }
+
+ case Terminated(buildManager) =>
+ //TODO use a more efficient data structure
+ runningBuilds.find(_._2 == buildManager).foreach {
+ runningBuilds -= _._1
+ }
+ }
+ }
+
+ private val scheduler = system.actorOf(Props(new Scheduler()), "crashbox-scheduler")
+
+ def start(
+ buildId: String,
+ url: URL,
+ out: () => OutputStream,
+ update: BuildState => Unit
+ ): Unit = {
+ scheduler ! ScheduleBuild(buildId, url, out, update)
+ }
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala
new file mode 100644
index 0000000..720b809
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala
@@ -0,0 +1,16 @@
+package io.crashbox.ci
+
+import java.io.File
+import java.net.URL
+import org.eclipse.jgit.api.Git
+import scala.concurrent.Future
+
+trait Source { self: Core =>
+
+ def fetchSource(from: URL, to: File): Future[File] = Future {
+ log.debug(s"Cloning git repo from $from to $to")
+ Git.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call()
+ to
+ }(blockingDispatcher)
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
new file mode 100644
index 0000000..94f51d0
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
@@ -0,0 +1,7 @@
+package io.crashbox.ci
+
+trait StateStore {
+
+}
+
+
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
new file mode 100644
index 0000000..5fd3769
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
@@ -0,0 +1,29 @@
+package io.crashbox.ci
+
+import java.io.{ File, FileInputStream, FileOutputStream, InputStream, OutputStream }
+import java.security.MessageDigest
+
+trait StreamStore { self: Core =>
+
+ val streamsDirectory: File = new File(config.getString("crashbox.streams.directory"))
+
+ private def logFile(id: String): File = {
+ val bytes = MessageDigest.getInstance("SHA-256").digest(id.getBytes)
+ val str = bytes.map{byte => Integer.toString((byte & 0xff) + 0x100, 16)}.mkString
+ val (head, tail) = str.splitAt(2)
+ new File(streamsDirectory, s"$head/$tail")
+ }
+
+ def saveStream(id: String): OutputStream = {
+ val file = logFile(id)
+ file.getParentFile.mkdirs()
+ file.createNewFile()
+ file.setWritable(true)
+ new FileOutputStream(file)
+ }
+
+ def readStream(id: String): InputStream = {
+ new FileInputStream(logFile(id))
+ }
+
+}