summaryrefslogtreecommitdiff
path: root/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
diff options
context:
space:
mode:
Diffstat (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala')
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Builder.scala107
1 files changed, 53 insertions, 54 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
index 77998cd..8ac7864 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -1,24 +1,19 @@
package io.crashbox.ci
-import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
-import akka.stream.SourceShape
-import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
-import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
-import akka.stream.stage.{ GraphStage }
-import java.io.{ File, OutputStream }
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.{ HashMap, Queue }
+import java.io.{File, OutputStream}
+
import scala.concurrent.Future
-import scala.util.{ Failure, Success }
+import scala.util.{Failure, Success}
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.stage.{GraphStage, GraphStageLogic, StageLogging}
class BuildSource[Env <: Environment, Id <: ExecutionId](
- taskId: TaskId,
- taskDef: TaskDef[Env],
- executor: Executor[Env, Id],
- mkdir: => File,
- mkout: => OutputStream // TODO refactor this into a two-output stage
+ taskId: TaskId,
+ taskDef: TaskDef[Env],
+ executor: Executor[Env, Id],
+ mkdir: => File,
+ mkout: => OutputStream // TODO refactor this into a two-output stage
) extends GraphStage[SourceShape[Builder.BuildState]] {
import Builder._
@@ -26,52 +21,54 @@ class BuildSource[Env <: Environment, Id <: ExecutionId](
val shape = new SourceShape(states)
- def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
- implicit def ec = materializer.executionContext
+ def createLogic(attributes: Attributes) =
+ new GraphStageLogic(shape) with StageLogging {
+ implicit def ec = materializer.executionContext
- lazy val instance: Future[Id] = executor.start(
- taskDef.environment,
- taskDef.script,
- mkdir,
- mkout
- )
+ lazy val instance: Future[Id] = executor.start(
+ taskDef.environment,
+ taskDef.script,
+ mkdir,
+ mkout
+ )
- val changeState = getAsyncCallback[BuildState]{state =>
- log.info(s"${state.taskId} transitioned to $state")
- emit(states, state)
- }
+ val changeState = getAsyncCallback[BuildState] { state =>
+ log.info(s"${state.taskId} transitioned to $state")
+ emit(states, state)
+ }
- val asyncClose = getAsyncCallback[Unit]{_ =>
- completeStage()
- }
+ val asyncClose = getAsyncCallback[Unit] { _ =>
+ completeStage()
+ }
- override def preStart() = {
- val pipeline = for (
- _ <- Future.unit;
- _ = changeState.invoke(TaskStarting(taskId, taskDef));
- eid <- instance;
- _ = changeState.invoke(TaskRunning(taskId, eid));
- status <- executor.result(eid)
- ) yield status
-
- pipeline onComplete { result =>
- result match {
- case Success(status) =>
- changeState.invoke(TaskFinished(taskId, status))
- case Failure(err) =>
- changeState.invoke(TaskFailed(taskId, err.toString))
+ override def preStart() = {
+ val pipeline = for (_ <- Future.unit;
+ _ = changeState.invoke(
+ TaskStarting(taskId, taskDef));
+ eid <- instance;
+ _ = changeState.invoke(TaskRunning(taskId, eid));
+ status <- executor.result(eid)) yield status
+
+ pipeline onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(taskId, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(taskId, err.toString))
+ }
+ asyncClose.invoke(())
}
- asyncClose.invoke(())
}
- }
- override def postStop() = {
- instance.foreach{ eid => executor.stop(eid)}
- }
+ override def postStop() = {
+ instance.foreach { eid =>
+ executor.stop(eid)
+ }
+ }
- setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
- }
+ }
}
object Builder {
@@ -79,8 +76,10 @@ object Builder {
sealed trait BuildState {
def taskId: TaskId
}
- case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) extends BuildState
- case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState
+ case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment])
+ extends BuildState
+ case class TaskRunning(taskId: TaskId, execId: ExecutionId)
+ extends BuildState
case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
case class TaskFailed(taskId: TaskId, message: String) extends BuildState