summaryrefslogtreecommitdiff
path: root/crashboxd/src/main/scala/io/crashbox
diff options
context:
space:
mode:
Diffstat (limited to 'crashboxd/src/main/scala/io/crashbox')
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Builder.scala107
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala5
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Executor.scala11
3 files changed, 61 insertions, 62 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
index 77998cd..8ac7864 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -1,24 +1,19 @@
package io.crashbox.ci
-import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
-import akka.stream.SourceShape
-import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
-import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
-import akka.stream.stage.{ GraphStage }
-import java.io.{ File, OutputStream }
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.{ HashMap, Queue }
+import java.io.{File, OutputStream}
+
import scala.concurrent.Future
-import scala.util.{ Failure, Success }
+import scala.util.{Failure, Success}
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.stage.{GraphStage, GraphStageLogic, StageLogging}
class BuildSource[Env <: Environment, Id <: ExecutionId](
- taskId: TaskId,
- taskDef: TaskDef[Env],
- executor: Executor[Env, Id],
- mkdir: => File,
- mkout: => OutputStream // TODO refactor this into a two-output stage
+ taskId: TaskId,
+ taskDef: TaskDef[Env],
+ executor: Executor[Env, Id],
+ mkdir: => File,
+ mkout: => OutputStream // TODO refactor this into a two-output stage
) extends GraphStage[SourceShape[Builder.BuildState]] {
import Builder._
@@ -26,52 +21,54 @@ class BuildSource[Env <: Environment, Id <: ExecutionId](
val shape = new SourceShape(states)
- def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
- implicit def ec = materializer.executionContext
+ def createLogic(attributes: Attributes) =
+ new GraphStageLogic(shape) with StageLogging {
+ implicit def ec = materializer.executionContext
- lazy val instance: Future[Id] = executor.start(
- taskDef.environment,
- taskDef.script,
- mkdir,
- mkout
- )
+ lazy val instance: Future[Id] = executor.start(
+ taskDef.environment,
+ taskDef.script,
+ mkdir,
+ mkout
+ )
- val changeState = getAsyncCallback[BuildState]{state =>
- log.info(s"${state.taskId} transitioned to $state")
- emit(states, state)
- }
+ val changeState = getAsyncCallback[BuildState] { state =>
+ log.info(s"${state.taskId} transitioned to $state")
+ emit(states, state)
+ }
- val asyncClose = getAsyncCallback[Unit]{_ =>
- completeStage()
- }
+ val asyncClose = getAsyncCallback[Unit] { _ =>
+ completeStage()
+ }
- override def preStart() = {
- val pipeline = for (
- _ <- Future.unit;
- _ = changeState.invoke(TaskStarting(taskId, taskDef));
- eid <- instance;
- _ = changeState.invoke(TaskRunning(taskId, eid));
- status <- executor.result(eid)
- ) yield status
-
- pipeline onComplete { result =>
- result match {
- case Success(status) =>
- changeState.invoke(TaskFinished(taskId, status))
- case Failure(err) =>
- changeState.invoke(TaskFailed(taskId, err.toString))
+ override def preStart() = {
+ val pipeline = for (_ <- Future.unit;
+ _ = changeState.invoke(
+ TaskStarting(taskId, taskDef));
+ eid <- instance;
+ _ = changeState.invoke(TaskRunning(taskId, eid));
+ status <- executor.result(eid)) yield status
+
+ pipeline onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(taskId, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(taskId, err.toString))
+ }
+ asyncClose.invoke(())
}
- asyncClose.invoke(())
}
- }
- override def postStop() = {
- instance.foreach{ eid => executor.stop(eid)}
- }
+ override def postStop() = {
+ instance.foreach { eid =>
+ executor.stop(eid)
+ }
+ }
- setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
- }
+ }
}
object Builder {
@@ -79,8 +76,10 @@ object Builder {
sealed trait BuildState {
def taskId: TaskId
}
- case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) extends BuildState
- case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState
+ case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment])
+ extends BuildState
+ case class TaskRunning(taskId: TaskId, execId: ExecutionId)
+ extends BuildState
case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
case class TaskFailed(taskId: TaskId, message: String) extends BuildState
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
index 25a7f2e..d7678e5 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
@@ -13,13 +13,11 @@ import com.spotify.docker.client.DockerClient.{
AttachParameter,
ListContainersParam
}
-import scala.util.Random
import com.spotify.docker.client.LogStream
import com.spotify.docker.client.exceptions.ContainerNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.messages.HostConfig.Bind
-
object DockerExecutor {
def containerUser = "crashbox"
@@ -29,7 +27,8 @@ object DockerExecutor {
}
class DockerExecutor(uri: String = "unix:///run/docker.sock")(
- implicit system: ActorSystem) extends Executor[DockerEnvironment, DockerExecutionId] {
+ implicit system: ActorSystem)
+ extends Executor[DockerEnvironment, DockerExecutionId] {
import DockerExecutor._
import system.log
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
index 92a0fc7..51274e8 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
@@ -1,15 +1,16 @@
package io.crashbox.ci
-import java.io.{ File, OutputStream }
+import java.io.{File, OutputStream}
+
import scala.concurrent.Future
trait Executor[Env <: Environment, Id <: ExecutionId] {
def start(
- environment: Env,
- script: String,
- buildDirectory: File,
- out: OutputStream
+ environment: Env,
+ script: String,
+ buildDirectory: File,
+ out: OutputStream
): Future[Id]
def result(id: Id): Future[Int]