diff options
author | Jakob Odersky <jakob@odersky.com> | 2017-03-06 01:32:39 -0800 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2017-03-06 01:32:39 -0800 |
commit | 374ac0ce8c10e9063c263ecf9c708f9ee6772973 (patch) | |
tree | a80f53db0a5ee8e3c40e1941b331869a2560164d /crashbox-server/src/main/scala/io/crashbox | |
parent | f9cc5e3f8478eb9f19cca093b8019e579f9c87e6 (diff) | |
download | crashbox-ci-374ac0ce8c10e9063c263ecf9c708f9ee6772973.tar.gz crashbox-ci-374ac0ce8c10e9063c263ecf9c708f9ee6772973.tar.bz2 crashbox-ci-374ac0ce8c10e9063c263ecf9c708f9ee6772973.zip |
Basic, single-task builder
Diffstat (limited to 'crashbox-server/src/main/scala/io/crashbox')
8 files changed, 392 insertions, 0 deletions
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala new file mode 100644 index 0000000..7e55640 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala @@ -0,0 +1,100 @@ +package io.crashbox.ci + +import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam } +import com.spotify.docker.client.LogStream +import com.spotify.docker.client.exceptions.ContainerNotFoundException +import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig, LogConfig } +import com.spotify.docker.client.messages.HostConfig.Bind +import java.io.{ File, OutputStream } +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ +import com.spotify.docker.client.DefaultDockerClient + +trait Builders { core: Core => + + val dockerClient = DefaultDockerClient.builder().uri("unix:///run/docker.sock").build() + + core.system.registerOnTermination{ + dockerClient.close() + } + + def containerUser = "crashbox" + def containerWorkDirectory = "/home/crashbox" + def containerKillTimeout = 10.seconds + + case class ContainerId(id: String) { + override def toString = id + } + + def startBuild( + image: String, + script: String, + dir: File, + out: OutputStream + ): Future[ContainerId] = Future { + val volume = Bind.builder().from(dir.getAbsolutePath).to(containerWorkDirectory).build() + val hostConfig = HostConfig.builder().binds(volume).build() + val containerConfig = ContainerConfig.builder() + .labels(Map("crashbox" -> "build").asJava) + .hostConfig(hostConfig) + .tty(true) // combine stdout and stderr into stdout + .image(image) + .user(containerUser) + .workingDir(containerWorkDirectory) + .entrypoint("/bin/sh", "-c") + .cmd(script) + .build() + val container = dockerClient.createContainer(containerConfig).id + + log.debug(s"Starting container $container") + dockerClient.startContainer(container) + + log.debug(s"Attaching log stream of container $container") + blockingDispatcher execute new Runnable { + override def run() = { + var stream: LogStream = null + try { + stream = dockerClient.attachContainer( + container, + AttachParameter.LOGS, + AttachParameter.STDOUT, + AttachParameter.STREAM + ) + stream.attach(out, null, true) + } finally { + if (stream != null) stream.close() + } + } + } + ContainerId(container) + }(blockingDispatcher) + + def waitBuild(id: ContainerId): Future[Int] = Future { + log.debug(s"Waiting for container $id to exit") + val res: Int = dockerClient.waitContainer(id.id).statusCode() + cancelBuild(id) + res + }(blockingDispatcher) + + def cancelBuild(id: ContainerId): Unit = { + log.debug(s"Stopping container $id") + try { + dockerClient.stopContainer(id.id, containerKillTimeout.toUnit(SECONDS).toInt) + dockerClient.removeContainer(id.id) + } catch { + case _: ContainerNotFoundException => // build already cancelled + } + } + + def reapDeadBuilds(): Unit = { + val stale = dockerClient.listContainers( + ListContainersParam.withLabel("crashbox"), + ListContainersParam.withStatusExited() + ).asScala + stale.foreach { container => + dockerClient.removeContainer(container.id()) + } + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala new file mode 100644 index 0000000..8342293 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala @@ -0,0 +1,22 @@ +package io.crashbox.ci + +import akka.actor.ActorSystem +import scala.concurrent.duration.Duration +import scala.concurrent.{ Await, ExecutionContext } + +trait Core { + + implicit val system: ActorSystem = ActorSystem("crashbox") + implicit val executionContext: ExecutionContext = system.dispatcher + val blockingDispatcher: ExecutionContext = system.dispatchers.lookup("crashbox.blocking-dispatcher") + + def log = system.log + def config = system.settings.config + + sys.addShutdownHook { + log.info("Shutting down systm") + Await.ready(system.terminate(), Duration.Inf) + println("shutdown") + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala new file mode 100644 index 0000000..5b58875 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala @@ -0,0 +1,26 @@ +package io.crashbox.ci + +import java.net.URL + + +object Main extends Core + with Schedulers + with Builders + with Parsers + with Source + with StreamStore { + + def main(args: Array[String]): Unit = { + reapDeadBuilds() + + start( + "random_build", + new URL("file:///home/jodersky/tmp/dummy"), + () => saveStream("random_build"), + state => println(state) + ) + Thread.sleep(15000) + System.exit(0) + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala new file mode 100644 index 0000000..6f43380 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Parsers.scala @@ -0,0 +1,36 @@ +package io.crashbox.ci + +import java.io.File +import java.nio.file.Files +import scala.collection.JavaConverters._ + +trait Parsers { + + def defaultImage = "crashbox/default" + + case class BuildDef( + image: String, + script: String + ) + + case class ParseError(message: String) + + def parseBuild(workdir: File): Either[BuildDef, ParseError] = { + val file = new File(workdir, ".crashbox.txt") + if (!file.exists()) { + return Right(ParseError("No build configuration file .crashbox.txt found.")) + } + + val lines = Files.readAllLines(file.toPath).asScala.map(_.trim) + + val Pattern = """(\w+)\s*:\s*(.+)""".r + + val image = lines.collectFirst{case Pattern("image", s) => s}.getOrElse(defaultImage) + val script = lines.collectFirst{case Pattern("script", s) => s} + + script match { + case Some(s) => Left(BuildDef(image, s)) + case None => Right(ParseError("No build script defined in configuration.")) + } + } +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala new file mode 100644 index 0000000..c860932 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala @@ -0,0 +1,156 @@ +package io.crashbox.ci + +import akka.actor.{ Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated } +import akka.stream.stage.GraphStageLogic +import akka.stream.{ Attributes, Outlet, SourceShape } +import akka.stream.stage.GraphStage +import java.io.{ File, OutputStream } +import java.net.URL +import java.nio.file.Files +import java.util.Base64 +import scala.collection.mutable.HashMap +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.util.control.NonFatal +import akka.actor.SupervisorStrategy._ +import scala.util.{ Failure, Success } + + +trait Schedulers extends { self: Core with Source with Builders with Parsers => + + private def newTempDir: File = Files.createTempDirectory("crashbox-run").toFile() + + sealed trait BuildState + case class Cloning(url: URL) extends BuildState + case class Parsing(dir: File) extends BuildState + case class Starting(dir: File, buildDef: BuildDef) extends BuildState + case class Running(id: ContainerId) extends BuildState + + sealed trait EndBuildState extends BuildState + case class Finished(status: Int) extends EndBuildState + case class Failed(message: String) extends EndBuildState + + class BuildManager( + url: URL, + openOut: () => OutputStream, + update: BuildState => Unit + ) extends Actor with ActorLogging { + + var buildDir: Option[File] = None + var out: Option[OutputStream] = None + var containerId: Option[ContainerId] = None + + override def postStop() = { + containerId foreach { cancelBuild(_) } + out foreach { _.close() } + buildDir foreach { _.delete() } + } + + override def preStart() = { + log.info(s"Started build manager for $url") + self ! Cloning(url) + } + + override def receive: Receive = { + + case state@Cloning(url) => + log.debug("Update build state: cloning") + update(state) + fetchSource(url, newTempDir) onComplete { + case Success(dir) => + self ! Parsing(dir) + case Failure(err) => + self ! Failed(s"Error fetching source from $url") + } + + case state@Parsing(src) => + log.debug("Update build state: parsing") + update(state) + buildDir = Some(src) + parseBuild(src) match { + case Left(buildDef) => + self ! Starting(src, buildDef) + case Right(err) => + self ! Failed(s"Failed to parse build $err") + } + + case state@Starting(src, bd) => + log.debug("Update build state: starting") + update(state) + val so = openOut() + out = Some(so) + startBuild(bd.image, bd.script, src, so) onComplete { + case Success(id) => + self ! Running(id) + case Failure(err) => + self ! Failed(s"Failed to start build $err") + } + + case state@Running(id) => + log.debug("Update build state: running") + update(state) + containerId = Some(id) + waitBuild(id) onComplete { + case Success(status) => + self ! Finished(status) + case Failure(err) => + self ! Failed(s"Error waiting for build to complete") + } + + case state@Finished(status) => + log.debug("Update build state: finished") + update(state) + context stop self + + case state@Failed(message) => + log.debug("Update build state: failed") + update(state) + context stop self + } + } + object BuildManager { + def apply(buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit) = + Props(new BuildManager(url, out, update)) + } + + private sealed trait SchedulerCommand + private case class ScheduleBuild( + buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit + ) extends SchedulerCommand + + class Scheduler extends Actor { + + val runningBuilds = new HashMap[String, ActorRef] + + override def receive = { + + case sb: ScheduleBuild => + runningBuilds.get(sb.buildId) match { + case Some(_) => //already running + case None => + val buildManager = context.actorOf(BuildManager( + sb.buildId, sb.url, sb.out, sb.update), s"build-${sb.buildId}") + context watch buildManager + runningBuilds += sb.buildId -> buildManager + } + + case Terminated(buildManager) => + //TODO use a more efficient data structure + runningBuilds.find(_._2 == buildManager).foreach { + runningBuilds -= _._1 + } + } + } + + private val scheduler = system.actorOf(Props(new Scheduler()), "crashbox-scheduler") + + def start( + buildId: String, + url: URL, + out: () => OutputStream, + update: BuildState => Unit + ): Unit = { + scheduler ! ScheduleBuild(buildId, url, out, update) + } + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala new file mode 100644 index 0000000..720b809 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/Source.scala @@ -0,0 +1,16 @@ +package io.crashbox.ci + +import java.io.File +import java.net.URL +import org.eclipse.jgit.api.Git +import scala.concurrent.Future + +trait Source { self: Core => + + def fetchSource(from: URL, to: File): Future[File] = Future { + log.debug(s"Cloning git repo from $from to $to") + Git.cloneRepository.setURI(from.toURI.toString).setDirectory(to).call() + to + }(blockingDispatcher) + +} diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala new file mode 100644 index 0000000..94f51d0 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala @@ -0,0 +1,7 @@ +package io.crashbox.ci + +trait StateStore { + +} + + diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala new file mode 100644 index 0000000..5fd3769 --- /dev/null +++ b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala @@ -0,0 +1,29 @@ +package io.crashbox.ci + +import java.io.{ File, FileInputStream, FileOutputStream, InputStream, OutputStream } +import java.security.MessageDigest + +trait StreamStore { self: Core => + + val streamsDirectory: File = new File(config.getString("crashbox.streams.directory")) + + private def logFile(id: String): File = { + val bytes = MessageDigest.getInstance("SHA-256").digest(id.getBytes) + val str = bytes.map{byte => Integer.toString((byte & 0xff) + 0x100, 16)}.mkString + val (head, tail) = str.splitAt(2) + new File(streamsDirectory, s"$head/$tail") + } + + def saveStream(id: String): OutputStream = { + val file = logFile(id) + file.getParentFile.mkdirs() + file.createNewFile() + file.setWritable(true) + new FileOutputStream(file) + } + + def readStream(id: String): InputStream = { + new FileInputStream(logFile(id)) + } + +} |