From a7492a3b101ed0e119bf3315ff4dfb63a8f3fe01 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 9 Apr 2017 18:18:22 -0700 Subject: Formatting --- .../src/main/scala/io/crashbox/ci/Builder.scala | 107 ++++++++++----------- 1 file changed, 53 insertions(+), 54 deletions(-) (limited to 'crashboxd/src/main/scala/io/crashbox/ci/Builder.scala') diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala index 77998cd..8ac7864 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -1,24 +1,19 @@ package io.crashbox.ci -import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } -import akka.stream.SourceShape -import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } -import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } -import akka.stream.stage.{ GraphStage } -import java.io.{ File, OutputStream } -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.concurrent.TrieMap -import scala.collection.mutable.{ HashMap, Queue } +import java.io.{File, OutputStream} + import scala.concurrent.Future -import scala.util.{ Failure, Success } +import scala.util.{Failure, Success} +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStage, GraphStageLogic, StageLogging} class BuildSource[Env <: Environment, Id <: ExecutionId]( - taskId: TaskId, - taskDef: TaskDef[Env], - executor: Executor[Env, Id], - mkdir: => File, - mkout: => OutputStream // TODO refactor this into a two-output stage + taskId: TaskId, + taskDef: TaskDef[Env], + executor: Executor[Env, Id], + mkdir: => File, + mkout: => OutputStream // TODO refactor this into a two-output stage ) extends GraphStage[SourceShape[Builder.BuildState]] { import Builder._ @@ -26,52 +21,54 @@ class BuildSource[Env <: Environment, Id <: ExecutionId]( val shape = new SourceShape(states) - def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { - implicit def ec = materializer.executionContext + def createLogic(attributes: Attributes) = + new GraphStageLogic(shape) with StageLogging { + implicit def ec = materializer.executionContext - lazy val instance: Future[Id] = executor.start( - taskDef.environment, - taskDef.script, - mkdir, - mkout - ) + lazy val instance: Future[Id] = executor.start( + taskDef.environment, + taskDef.script, + mkdir, + mkout + ) - val changeState = getAsyncCallback[BuildState]{state => - log.info(s"${state.taskId} transitioned to $state") - emit(states, state) - } + val changeState = getAsyncCallback[BuildState] { state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) + } - val asyncClose = getAsyncCallback[Unit]{_ => - completeStage() - } + val asyncClose = getAsyncCallback[Unit] { _ => + completeStage() + } - override def preStart() = { - val pipeline = for ( - _ <- Future.unit; - _ = changeState.invoke(TaskStarting(taskId, taskDef)); - eid <- instance; - _ = changeState.invoke(TaskRunning(taskId, eid)); - status <- executor.result(eid) - ) yield status - - pipeline onComplete { result => - result match { - case Success(status) => - changeState.invoke(TaskFinished(taskId, status)) - case Failure(err) => - changeState.invoke(TaskFailed(taskId, err.toString)) + override def preStart() = { + val pipeline = for (_ <- Future.unit; + _ = changeState.invoke( + TaskStarting(taskId, taskDef)); + eid <- instance; + _ = changeState.invoke(TaskRunning(taskId, eid)); + status <- executor.result(eid)) yield status + + pipeline onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(taskId, status)) + case Failure(err) => + changeState.invoke(TaskFailed(taskId, err.toString)) + } + asyncClose.invoke(()) } - asyncClose.invoke(()) } - } - override def postStop() = { - instance.foreach{ eid => executor.stop(eid)} - } + override def postStop() = { + instance.foreach { eid => + executor.stop(eid) + } + } - setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) - } + } } object Builder { @@ -79,8 +76,10 @@ object Builder { sealed trait BuildState { def taskId: TaskId } - case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) extends BuildState - case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState + case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) + extends BuildState + case class TaskRunning(taskId: TaskId, execId: ExecutionId) + extends BuildState case class TaskFinished(taskId: TaskId, status: Int) extends BuildState case class TaskFailed(taskId: TaskId, message: String) extends BuildState -- cgit v1.2.3