summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-03-11 14:33:01 -0800
committerJakob Odersky <jakob@odersky.com>2017-03-11 14:33:01 -0800
commit5b57a310aec9a6f8bf7d3b42786fe0d758d40083 (patch)
tree3dc9ce3bcc496d89b3909854c681c2a27f7a98eb
parent204376d7ede8f630d1f58aa5fea86f01f951ba8d (diff)
downloadcrashbox-ci-5b57a310aec9a6f8bf7d3b42786fe0d758d40083.tar.gz
crashbox-ci-5b57a310aec9a6f8bf7d3b42786fe0d758d40083.tar.bz2
crashbox-ci-5b57a310aec9a6f8bf7d3b42786fe0d758d40083.zip
Random build id
-rwxr-xr-xbuild9
-rw-r--r--crashbox-server/build.sbt2
-rw-r--r--crashbox-server/src/main/resources/reference.conf7
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Core.scala10
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala (renamed from crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala)36
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala55
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Main.scala13
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala85
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala3
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala62
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala38
-rw-r--r--crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala2
-rw-r--r--project/Dependencies.scala3
13 files changed, 180 insertions, 145 deletions
diff --git a/build b/build
index d4f92c1..93013a1 100755
--- a/build
+++ b/build
@@ -3,13 +3,18 @@
case "$1" in
start)
shift
- curl -i -X POST -H "Content-Type: application/json" "http://localhost:9000/api/submit" \
+ curl -i -sS -X POST -H "Content-Type: application/json" "http://localhost:9000/api/submit" \
-d "{\"url\": \"$1\"}"
;;
cancel)
shift
- curl -i -X POST "http://localhost:9000/api/$1/cancel"
+ curl -i -sS -X POST "http://localhost:9000/api/$1/cancel"
;;
+ logs)
+ shift
+ curl -i -sS -X GET "http://localhost:9000/api/$1/logs"
+ ;;
+
*)
echo "invalid command $1" >&2
exit 2
diff --git a/crashbox-server/build.sbt b/crashbox-server/build.sbt
index 3de6438..ed4179d 100644
--- a/crashbox-server/build.sbt
+++ b/crashbox-server/build.sbt
@@ -9,5 +9,7 @@ libraryDependencies ++= Seq(
Dependencies.jgitArchive,
Dependencies.jgitServer,
Dependencies.dockerClient,
+ Dependencies.slick,
+ "com.h2database" % "h2" % "1.4.193",
Dependencies.scalatest % Test
)
diff --git a/crashbox-server/src/main/resources/reference.conf b/crashbox-server/src/main/resources/reference.conf
index bfcd645..3ac6de7 100644
--- a/crashbox-server/src/main/resources/reference.conf
+++ b/crashbox-server/src/main/resources/reference.conf
@@ -15,6 +15,13 @@ crashbox {
streams {
directory = "streams"
}
+
+ db {
+ url = "jdbc:h2:mem:test1"
+ driver = org.h2.Driver
+ connectionPool = disabled
+ keepAliveConnection = true
+ }
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
index 78a4252..39a1aec 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Core.scala
@@ -4,7 +4,9 @@ import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.Duration
import akka.actor.ActorSystem
+import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
+import com.typesafe.config.Config
trait Core {
@@ -14,13 +16,13 @@ trait Core {
val blockingDispatcher: ExecutionContext =
system.dispatchers.lookup("crashbox.blocking-dispatcher")
- def log = system.log
- def config = system.settings.config
+ def log: LoggingAdapter = system.log
+ def config: Config = system.settings.config
sys.addShutdownHook {
- log.info("Shutting down systm")
+ log.info("Shutting down core system")
Await.ready(system.terminate(), Duration.Inf)
- println("shutdown")
+ log.info("System stopped")
}
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala
index 1532832..045bf89 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/Builders.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Executors.scala
@@ -16,7 +16,7 @@ import com.spotify.docker.client.exceptions.ContainerNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.messages.HostConfig.Bind
-trait Builders { core: Core =>
+trait Executors { core: Core =>
val dockerClient =
DefaultDockerClient.builder().uri("unix:///run/docker.sock").build()
@@ -29,14 +29,16 @@ trait Builders { core: Core =>
def containerWorkDirectory = "/home/crashbox"
def containerKillTimeout = 10.seconds
- case class ContainerId(id: String) {
- override def toString = id
+ case class ExecutionId(containerId: String) {
+ override def toString = containerId
}
- def startBuild(image: String,
- script: String,
- dir: File,
- out: OutputStream): Future[ContainerId] =
+ def startExecution(
+ image: String,
+ script: String,
+ dir: File,
+ out: OutputStream
+ ): Future[ExecutionId] =
Future {
val volume = Bind
.builder()
@@ -77,23 +79,24 @@ trait Builders { core: Core =>
}
}
}
- ContainerId(container)
+ ExecutionId(container)
}(blockingDispatcher)
- def waitBuild(id: ContainerId): Future[Int] =
+ def waitExecution(id: ExecutionId): Future[Int] =
Future {
log.debug(s"Waiting for container $id to exit")
- val res: Int = dockerClient.waitContainer(id.id).statusCode()
- cancelBuild(id)
+ val res: Int = dockerClient.waitContainer(id.containerId).statusCode()
+ cancelExecution(id)
res
}(blockingDispatcher)
- def cancelBuild(id: ContainerId): Unit = {
- log.debug(s"Stopping container $id")
+ def cancelExecution(id: ExecutionId): Unit = {
try {
- dockerClient.stopContainer(id.id,
+ log.debug(s"Stopping container $id")
+ dockerClient.stopContainer(id.containerId,
containerKillTimeout.toUnit(SECONDS).toInt)
- dockerClient.removeContainer(id.id)
+ log.debug(s"Removing container $id")
+ dockerClient.removeContainer(id.containerId)
} catch {
case _: ContainerNotFoundException => // build already cancelled
}
@@ -107,7 +110,8 @@ trait Builders { core: Core =>
)
.asScala
stale.foreach { container =>
- dockerClient.removeContainer(container.id())
+ log.warning(s"Removing stale container ${container.id}")
+ dockerClient.removeContainer(container.id)
}
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala
index 71a4f5b..c9e62c3 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/HttpApi.scala
@@ -1,7 +1,7 @@
package io.crashbox.ci
import java.net.URL
-import java.security.MessageDigest
+import java.util.UUID
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.{Marshaller, ToResponseMarshaller}
@@ -10,24 +10,27 @@ import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
-import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Source => Src}
-import spray.json.DefaultJsonProtocol
+import akka.stream.scaladsl.StreamConverters
+import spray.json._
-trait HttpApi { self: Core with Schedulers with StreamStore =>
+trait HttpApi { self: Core with Schedulers with Storage =>
val endpoint = "api"
- case class Request(url: String) {
- def buildId: String = {
- val bytes = MessageDigest.getInstance("SHA-256").digest(url.getBytes)
- bytes.map { byte =>
- Integer.toString((byte & 0xff) + 0x100, 16)
- }.mkString
- }
- }
+ case class Request(url: URL) {}
object Protocol extends DefaultJsonProtocol {
+ val urlReader = new JsonReader[URL] {
+ override def read(js: JsValue) = js match {
+ case JsString(str) => new URL(str)
+ case _ => deserializationError("Expected valid url string")
+ }
+ }
+ val urlWriter = new JsonWriter[URL] {
+ override def write(url: URL) = JsString(url.toString())
+ }
+ implicit val urlFormat: JsonFormat[URL] = jsonFormat(urlReader, urlWriter)
implicit val request = jsonFormat1(Request)
}
import Protocol._
@@ -43,25 +46,25 @@ trait HttpApi { self: Core with Schedulers with StreamStore =>
path("submit") {
post {
entity(as[Request]) { req =>
- val source = Src
- .queue[String](100, OverflowStrategy.fail)
- .mapMaterializedValue { q =>
- q.offer(s"Build ID: ${req.buildId}")
- start(
- req.buildId,
- new URL(req.url),
- () => saveStream(req.buildId),
- state => q.offer(state.toString)
- )
- }
- complete(source)
+ val scheduled = scheduleBuild(req.url).map(_.toString())
+ complete(scheduled)
}
}
} ~
path(Segment / "cancel") { buildId =>
post {
- cancel(buildId)
- complete(204 -> s"Cancelled $buildId")
+ cancelBuild(UUID.fromString(buildId))
+ complete(204 -> None)
+ }
+ } ~
+ path(Segment / "logs") { buildId =>
+ get {
+ val src = StreamConverters
+ .fromInputStream(() => readLog(UUID.fromString(buildId), 0))
+ .map { bs =>
+ bs.utf8String
+ }
+ complete(src)
}
}
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
index 4961b86..74a5527 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Main.scala
@@ -7,23 +7,22 @@ import akka.http.scaladsl.Http
object Main
extends Core
with Schedulers
- with Builders
+ with Storage
+ with Executors
with Parsers
with Source
- with StreamStore
with HttpApi {
def main(args: Array[String]): Unit = {
reapDeadBuilds()
- val host = system.settings.config.getString("crashbox.host")
- val port = system.settings.config.getInt("crashbox.port")
-
+ val host = config.getString("crashbox.host")
+ val port = config.getInt("crashbox.port")
Http(system).bindAndHandle(httpApi, host, port) onComplete {
case Success(_) =>
- system.log.info(s"Listening on $host:$port")
+ log.info(s"Listening on $host:$port")
case Failure(ex) =>
- system.log.error(ex, s"Failed to bind to $host:$port")
+ log.error(ex, s"Failed to bind to $host:$port")
system.terminate()
}
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
index 3600fc1..cb4e793 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
@@ -5,11 +5,15 @@ import java.net.URL
import java.nio.file.Files
import scala.collection.mutable.HashMap
+import scala.concurrent.Future
+import scala.concurrent.duration._
import scala.util.{Failure, Success}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
+import akka.util.Timeout
-trait Schedulers extends { self: Core with Source with Builders with Parsers =>
+trait Schedulers {
+ self: Core with Source with Executors with Parsers with Storage =>
private def newTempDir: File =
Files.createTempDirectory("crashbox-run").toFile()
@@ -18,25 +22,24 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case class Cloning(url: URL) extends BuildState
case class Parsing(dir: File) extends BuildState
case class Starting(dir: File, buildDef: BuildDef) extends BuildState
- case class Running(id: ContainerId) extends BuildState
+ case class Running(id: ExecutionId) extends BuildState
sealed trait EndBuildState extends BuildState
case class Finished(status: Int) extends EndBuildState
case class Failed(message: String) extends EndBuildState
class BuildManager(
- url: URL,
- openOut: () => OutputStream,
- update: BuildState => Unit
+ buildId: BuildId,
+ url: URL
) extends Actor
with ActorLogging {
var buildDir: Option[File] = None
var out: Option[OutputStream] = None
- var containerId: Option[ContainerId] = None
+ var containerId: Option[ExecutionId] = None
override def postStop() = {
- containerId foreach { cancelBuild(_) }
+ containerId foreach { cancelExecution(_) }
out foreach { _.close() }
buildDir foreach { _.delete() }
log.info(s"Stopped build of $url")
@@ -51,7 +54,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case state @ Cloning(url) =>
log.debug("Update build state: cloning")
- update(state)
+ updateBuildState(buildId, state)
fetchSource(url, newTempDir) onComplete {
case Success(dir) =>
self ! Parsing(dir)
@@ -61,7 +64,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case state @ Parsing(src) =>
log.debug("Update build state: parsing")
- update(state)
+ updateBuildState(buildId, state)
buildDir = Some(src)
parseBuild(src) match {
case Left(buildDef) =>
@@ -72,10 +75,10 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case state @ Starting(src, bd) =>
log.debug("Update build state: starting")
- update(state)
- val so = openOut()
+ updateBuildState(buildId, state)
+ val so = saveLog(buildId, 0)
out = Some(so)
- startBuild(bd.image, bd.script, src, so) onComplete {
+ startExecution(bd.image, bd.script, src, so) onComplete {
case Success(id) =>
self ! Running(id)
case Failure(err) =>
@@ -84,9 +87,9 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case state @ Running(id) =>
log.debug("Update build state: running")
- update(state)
+ updateBuildState(buildId, state)
containerId = Some(id)
- waitBuild(id) onComplete {
+ waitExecution(id) onComplete {
case Success(status) =>
self ! Finished(status)
case Failure(err) =>
@@ -95,48 +98,36 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case state @ Finished(status) =>
log.debug("Update build state: finished")
- update(state)
+ updateBuildState(buildId, state)
context stop self
case state @ Failed(message) =>
log.debug("Update build state: failed")
- update(state)
+ updateBuildState(buildId, state)
context stop self
}
}
object BuildManager {
- def apply(buildId: String,
- url: URL,
- out: () => OutputStream,
- update: BuildState => Unit) =
- Props(new BuildManager(url, out, update))
+ def apply(buildId: BuildId, url: URL) =
+ Props(new BuildManager(buildId, url))
}
private sealed trait SchedulerCommand
- private case class ScheduleBuild(
- buildId: String,
- url: URL,
- out: () => OutputStream,
- update: BuildState => Unit
- ) extends SchedulerCommand
- private case class CancelBuild(buildId: String) extends SchedulerCommand
+ private case class ScheduleBuild(url: URL) extends SchedulerCommand
+ private case class CancelBuild(buildId: BuildId) extends SchedulerCommand
class Scheduler extends Actor {
-
- val runningBuilds = new HashMap[String, ActorRef]
+ val runningBuilds = new HashMap[BuildId, ActorRef]
override def receive = {
- case sb: ScheduleBuild =>
- runningBuilds.get(sb.buildId) match {
- case Some(_) => //already running
- case None =>
- val buildManager = context.actorOf(
- BuildManager(sb.buildId, sb.url, sb.out, sb.update),
- s"build-${sb.buildId}")
- context watch buildManager
- runningBuilds += sb.buildId -> buildManager
- }
+ case ScheduleBuild(url) =>
+ val buildId = newBuildId()
+ val buildManager =
+ context.actorOf(BuildManager(buildId, url), s"build-${buildId}")
+ context watch buildManager
+ runningBuilds += buildId -> buildManager
+ sender ! buildId
case CancelBuild(id) =>
runningBuilds.get(id).foreach { builder =>
@@ -154,16 +145,14 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
private val scheduler =
system.actorOf(Props(new Scheduler()), "crashbox-scheduler")
- def start(
- buildId: String,
- url: URL,
- out: () => OutputStream,
- update: BuildState => Unit
- ): Unit = {
- scheduler ! ScheduleBuild(buildId, url, out, update)
+ // None if build can not be scheduled (queue is full)
+ def scheduleBuild(url: URL): Future[BuildId] = {
+ import akka.pattern.ask
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ (scheduler ? ScheduleBuild(url)).mapTo[BuildId]
}
- def cancel(buildId: String): Unit = {
+ def cancelBuild(buildId: BuildId): Unit = {
scheduler ! CancelBuild(buildId)
}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
deleted file mode 100644
index 3474407..0000000
--- a/crashbox-server/src/main/scala/io/crashbox/ci/StateStore.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package io.crashbox.ci
-
-trait StateStore {}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala
new file mode 100644
index 0000000..1035940
--- /dev/null
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Storage.scala
@@ -0,0 +1,62 @@
+package io.crashbox.ci
+
+import java.io.{
+ BufferedInputStream,
+ File,
+ FileInputStream,
+ FileOutputStream,
+ InputStream,
+ OutputStream
+}
+import java.util.UUID
+
+import slick.driver.H2Driver.api._
+
+trait Storage { self: Core with Parsers with Schedulers =>
+
+ type BuildId = UUID
+
+
+
+ def newBuildId() = UUID.randomUUID()
+
+
+
+ private val streamsDirectory: File = new File(
+ config.getString("crashbox.streams.directory"))
+
+ private def logFile(buildId: BuildId, task: Int): File = {
+ def stringifyId(id: BuildId): String = {
+ val bytes = new Array[Byte](16) // 128 bits
+ for (i <- 0 until 8) {
+ bytes(i) = ((id.getLeastSignificantBits >> i) & 0xff).toByte
+ }
+ for (i <- 0 until 8) {
+ bytes(8 + i) = ((id.getMostSignificantBits >> i) & 0xff).toByte
+ }
+ bytes.map { byte =>
+ Integer.toString((byte & 0xff) + 0x100, 16)
+ }.mkString
+ }
+ val (dir1, tail) = stringifyId(buildId).splitAt(2)
+ val (dir2, dir3) = tail.splitAt(2)
+ new File(streamsDirectory, s"$dir1/$dir2/$dir3/$task")
+ }
+
+ def saveLog(buildId: BuildId, task: Int): OutputStream = {
+ val file = logFile(buildId, task)
+ file.getParentFile.mkdirs()
+ file.createNewFile()
+ file.setWritable(true)
+ new FileOutputStream(file)
+ }
+
+ def readLog(buildId: BuildId, task: Int): InputStream = {
+ new FileInputStream(logFile(buildId, task))
+ }
+
+ def updateBuildState(buildId: BuildId, state: BuildState) = {
+ log.info(s"Build $buildId: state update $state")
+ }
+
+}
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala b/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
deleted file mode 100644
index bba3cdf..0000000
--- a/crashbox-server/src/main/scala/io/crashbox/ci/StreamStore.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package io.crashbox.ci
-
-import java.io.{
- File,
- FileInputStream,
- FileOutputStream,
- InputStream,
- OutputStream
-}
-import java.security.MessageDigest
-
-trait StreamStore { self: Core =>
-
- val streamsDirectory: File = new File(
- config.getString("crashbox.streams.directory"))
-
- private def logFile(id: String): File = {
- val bytes = MessageDigest.getInstance("SHA-256").digest(id.getBytes)
- val str = bytes.map { byte =>
- Integer.toString((byte & 0xff) + 0x100, 16)
- }.mkString
- val (head, tail) = str.splitAt(2)
- new File(streamsDirectory, s"$head/$tail")
- }
-
- def saveStream(id: String): OutputStream = {
- val file = logFile(id)
- file.getParentFile.mkdirs()
- file.createNewFile()
- file.setWritable(true)
- new FileOutputStream(file)
- }
-
- def readStream(id: String): InputStream = {
- new FileInputStream(logFile(id))
- }
-
-}
diff --git a/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala
index a7e7ae6..2c68d87 100644
--- a/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala
+++ b/crashbox-server/src/test/scala/io/crashbox/ci/TestUtil.scala
@@ -1,6 +1,6 @@
package io.crashbox.ci
-import java.io.{File, OutputStream}
+import java.io.File
import java.nio.file.Files
object TestUtil {
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 2d768d7..6038711 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -15,6 +15,9 @@ object Dependencies {
val dockerClient = "com.spotify" % "docker-client" % "8.1.1"
+ val slick = "com.typesafe.slick" %% "slick" % "3.2.0"
+ //"com.typesafe.slick" %% "slick-hikaricp" % "3.2.0"
+
val scalatest = "org.scalatest" %% "scalatest" % "3.0.1"
}