blob: 045bf899e92f6f278204bfed844f7c821a2ebe7f (
plain) (
tree)
|
|
package io.crashbox.ci
import java.io.{File, OutputStream}
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
import com.spotify.docker.client.DefaultDockerClient
import com.spotify.docker.client.DockerClient.{
AttachParameter,
ListContainersParam
}
import com.spotify.docker.client.LogStream
import com.spotify.docker.client.exceptions.ContainerNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.messages.HostConfig.Bind
trait Executors { core: Core =>
val dockerClient =
DefaultDockerClient.builder().uri("unix:///run/docker.sock").build()
core.system.registerOnTermination {
dockerClient.close()
}
def containerUser = "crashbox"
def containerWorkDirectory = "/home/crashbox"
def containerKillTimeout = 10.seconds
case class ExecutionId(containerId: String) {
override def toString = containerId
}
def startExecution(
image: String,
script: String,
dir: File,
out: OutputStream
): Future[ExecutionId] =
Future {
val volume = Bind
.builder()
.from(dir.getAbsolutePath)
.to(containerWorkDirectory)
.build()
val hostConfig = HostConfig.builder().binds(volume).build()
val containerConfig = ContainerConfig
.builder()
.labels(Map("crashbox" -> "build").asJava)
.hostConfig(hostConfig)
.tty(true) // combine stdout and stderr into stdout
.image(image)
.user(containerUser)
.workingDir(containerWorkDirectory)
.entrypoint("/bin/sh", "-c")
.cmd(script)
.build()
val container = dockerClient.createContainer(containerConfig).id
log.debug(s"Starting container $container")
dockerClient.startContainer(container)
log.debug(s"Attaching log stream of container $container")
blockingDispatcher execute new Runnable {
override def run() = {
var stream: LogStream = null
try {
stream = dockerClient.attachContainer(
container,
AttachParameter.LOGS,
AttachParameter.STDOUT,
AttachParameter.STREAM
)
stream.attach(out, null, true)
} finally {
if (stream != null) stream.close()
}
}
}
ExecutionId(container)
}(blockingDispatcher)
def waitExecution(id: ExecutionId): Future[Int] =
Future {
log.debug(s"Waiting for container $id to exit")
val res: Int = dockerClient.waitContainer(id.containerId).statusCode()
cancelExecution(id)
res
}(blockingDispatcher)
def cancelExecution(id: ExecutionId): Unit = {
try {
log.debug(s"Stopping container $id")
dockerClient.stopContainer(id.containerId,
containerKillTimeout.toUnit(SECONDS).toInt)
log.debug(s"Removing container $id")
dockerClient.removeContainer(id.containerId)
} catch {
case _: ContainerNotFoundException => // build already cancelled
}
}
def reapDeadBuilds(): Unit = {
val stale = dockerClient
.listContainers(
ListContainersParam.withLabel("crashbox"),
ListContainersParam.withStatusExited()
)
.asScala
stale.foreach { container =>
log.warning(s"Removing stale container ${container.id}")
dockerClient.removeContainer(container.id)
}
}
}
|