From a7492a3b101ed0e119bf3315ff4dfb63a8f3fe01 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 9 Apr 2017 18:18:22 -0700 Subject: Formatting --- .../src/main/scala/io/crashbox/ci/Builder.scala | 107 ++++++++++----------- .../main/scala/io/crashbox/ci/DockerExecutor.scala | 5 +- .../src/main/scala/io/crashbox/ci/Executor.scala | 11 ++- .../test/scala/io/crashbox/ci/BuildStageSpec.scala | 40 ++++---- .../scala/io/crashbox/ci/DockerExecutorSpec.scala | 38 ++++---- .../src/test/scala/io/crashbox/ci/DockerUtil.scala | 3 +- .../src/test/scala/io/crashbox/ci/IOUtil.scala | 10 +- 7 files changed, 106 insertions(+), 108 deletions(-) diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala index 77998cd..8ac7864 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala @@ -1,24 +1,19 @@ package io.crashbox.ci -import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem } -import akka.stream.SourceShape -import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging } -import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet } -import akka.stream.stage.{ GraphStage } -import java.io.{ File, OutputStream } -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.concurrent.TrieMap -import scala.collection.mutable.{ HashMap, Queue } +import java.io.{File, OutputStream} + import scala.concurrent.Future -import scala.util.{ Failure, Success } +import scala.util.{Failure, Success} +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStage, GraphStageLogic, StageLogging} class BuildSource[Env <: Environment, Id <: ExecutionId]( - taskId: TaskId, - taskDef: TaskDef[Env], - executor: Executor[Env, Id], - mkdir: => File, - mkout: => OutputStream // TODO refactor this into a two-output stage + taskId: TaskId, + taskDef: TaskDef[Env], + executor: Executor[Env, Id], + mkdir: => File, + mkout: => OutputStream // TODO refactor this into a two-output stage ) extends GraphStage[SourceShape[Builder.BuildState]] { import Builder._ @@ -26,52 +21,54 @@ class BuildSource[Env <: Environment, Id <: ExecutionId]( val shape = new SourceShape(states) - def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging { - implicit def ec = materializer.executionContext + def createLogic(attributes: Attributes) = + new GraphStageLogic(shape) with StageLogging { + implicit def ec = materializer.executionContext - lazy val instance: Future[Id] = executor.start( - taskDef.environment, - taskDef.script, - mkdir, - mkout - ) + lazy val instance: Future[Id] = executor.start( + taskDef.environment, + taskDef.script, + mkdir, + mkout + ) - val changeState = getAsyncCallback[BuildState]{state => - log.info(s"${state.taskId} transitioned to $state") - emit(states, state) - } + val changeState = getAsyncCallback[BuildState] { state => + log.info(s"${state.taskId} transitioned to $state") + emit(states, state) + } - val asyncClose = getAsyncCallback[Unit]{_ => - completeStage() - } + val asyncClose = getAsyncCallback[Unit] { _ => + completeStage() + } - override def preStart() = { - val pipeline = for ( - _ <- Future.unit; - _ = changeState.invoke(TaskStarting(taskId, taskDef)); - eid <- instance; - _ = changeState.invoke(TaskRunning(taskId, eid)); - status <- executor.result(eid) - ) yield status - - pipeline onComplete { result => - result match { - case Success(status) => - changeState.invoke(TaskFinished(taskId, status)) - case Failure(err) => - changeState.invoke(TaskFailed(taskId, err.toString)) + override def preStart() = { + val pipeline = for (_ <- Future.unit; + _ = changeState.invoke( + TaskStarting(taskId, taskDef)); + eid <- instance; + _ = changeState.invoke(TaskRunning(taskId, eid)); + status <- executor.result(eid)) yield status + + pipeline onComplete { result => + result match { + case Success(status) => + changeState.invoke(TaskFinished(taskId, status)) + case Failure(err) => + changeState.invoke(TaskFailed(taskId, err.toString)) + } + asyncClose.invoke(()) } - asyncClose.invoke(()) } - } - override def postStop() = { - instance.foreach{ eid => executor.stop(eid)} - } + override def postStop() = { + instance.foreach { eid => + executor.stop(eid) + } + } - setHandler(states, GraphStageLogic.IgnoreTerminateOutput) + setHandler(states, GraphStageLogic.IgnoreTerminateOutput) - } + } } object Builder { @@ -79,8 +76,10 @@ object Builder { sealed trait BuildState { def taskId: TaskId } - case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) extends BuildState - case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState + case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) + extends BuildState + case class TaskRunning(taskId: TaskId, execId: ExecutionId) + extends BuildState case class TaskFinished(taskId: TaskId, status: Int) extends BuildState case class TaskFailed(taskId: TaskId, message: String) extends BuildState diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala index 25a7f2e..d7678e5 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala @@ -13,13 +13,11 @@ import com.spotify.docker.client.DockerClient.{ AttachParameter, ListContainersParam } -import scala.util.Random import com.spotify.docker.client.LogStream import com.spotify.docker.client.exceptions.ContainerNotFoundException import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} import com.spotify.docker.client.messages.HostConfig.Bind - object DockerExecutor { def containerUser = "crashbox" @@ -29,7 +27,8 @@ object DockerExecutor { } class DockerExecutor(uri: String = "unix:///run/docker.sock")( - implicit system: ActorSystem) extends Executor[DockerEnvironment, DockerExecutionId] { + implicit system: ActorSystem) + extends Executor[DockerEnvironment, DockerExecutionId] { import DockerExecutor._ import system.log diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala index 92a0fc7..51274e8 100644 --- a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala +++ b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala @@ -1,15 +1,16 @@ package io.crashbox.ci -import java.io.{ File, OutputStream } +import java.io.{File, OutputStream} + import scala.concurrent.Future trait Executor[Env <: Environment, Id <: ExecutionId] { def start( - environment: Env, - script: String, - buildDirectory: File, - out: OutputStream + environment: Env, + script: String, + buildDirectory: File, + out: OutputStream ): Future[Id] def result(id: Id): Future[Int] diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala index dd79f72..033fe52 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala @@ -1,18 +1,16 @@ package io.crashbox.ci +import java.io.{ByteArrayOutputStream, File, OutputStream} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +import Builder._ import akka.NotUsed import akka.actor.ActorSystem -import akka.stream.scaladsl.Keep -import akka.stream.{ ClosedShape, KillSwitch } -import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source } -import akka.stream.{ ActorMaterializer, FanInShape2 } -import java.io.{ ByteArrayOutputStream, File, OutputStream } -import java.nio.file.Files +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source} import org.scalatest._ -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ -import Builder._ - class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { @@ -23,13 +21,13 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { case class DummyEnv() extends Environment { val id = DummyId() } - + case class DummyId() extends ExecutionId { import DummyId._ private var _state: State = Starting - def state = _state.synchronized{ _state} - def state_=(value: State) = _state.synchronized{_state = value} + def state = _state.synchronized { _state } + def state_=(value: State) = _state.synchronized { _state = value } } object DummyId { @@ -41,12 +39,15 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { } class DummyExecutor( - startDelay: Duration = 0.seconds, - resultDelay: Duration = 0.seconds, - stopDelay: Duration = 0.seconds + startDelay: Duration = 0.seconds, + resultDelay: Duration = 0.seconds, + stopDelay: Duration = 0.seconds ) extends Executor[DummyEnv, DummyId] { - override def start(env: DummyEnv, script: String, dir: File, out: OutputStream) = Future { + override def start(env: DummyEnv, + script: String, + dir: File, + out: OutputStream) = Future { Thread.sleep(startDelay.toMillis) env.id.state = DummyId.Running env.id @@ -66,8 +67,8 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { } def dummySource( - executor: DummyExecutor, - env: DummyEnv + executor: DummyExecutor, + env: DummyEnv ): Source[BuildState, NotUsed] = { val stage = new BuildSource( TaskId("dummy", 0), @@ -109,7 +110,6 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll { Thread.sleep(delay.toMillis) assert(env.id.state == DummyId.Stopped) - val expectedEvents = Seq( TaskStarting(taskId, taskDef), TaskRunning(taskId, env.id), diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala index 6794908..7d4dc79 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala @@ -1,8 +1,6 @@ package io.crashbox.ci -import com.spotify.docker.client.DockerClient -import com.spotify.docker.client.DockerClient.ListContainersParam -import java.io.{ByteArrayOutputStream, File} +import java.io.File import java.nio.file.Files import scala.collection.JavaConverters._ @@ -11,8 +9,6 @@ import scala.concurrent.duration._ import akka.actor.ActorSystem import org.scalatest._ -import scala.util.Random - class DockerExecutorSpec extends FlatSpec @@ -45,13 +41,12 @@ class DockerExecutorSpec def run[A](script: String)(tests: (Int, File, String) => A): A = withTemp { case (dir, out) => - - val awaitable = for (id <- exec.start(env, script, dir, out); - status <- exec.result(id)) yield { - status - } - val status = Await.result(awaitable, timeout) - tests(status, dir, new String(out.toByteArray()).trim()) + val awaitable = for (id <- exec.start(env, script, dir, out); + status <- exec.result(id)) yield { + status + } + val status = Await.result(awaitable, timeout) + tests(status, dir, new String(out.toByteArray()).trim()) } "DockerExecutor" should "return expected exit codes" in { @@ -97,15 +92,16 @@ class DockerExecutorSpec } it should "allow cancellations" in { - withTemp { case (dir, out) => - val script = "while true; do sleep 1; echo sleeping; done" - - val id = Await.result(exec.start(env, script, dir, out), timeout) - val check = exec.result(id).map { res => - assert(res == 137) - } - exec.stop(id) - Await.result(check, timeout) + withTemp { + case (dir, out) => + val script = "while true; do sleep 1; echo sleeping; done" + + val id = Await.result(exec.start(env, script, dir, out), timeout) + val check = exec.result(id).map { res => + assert(res == 137) + } + exec.stop(id) + Await.result(check, timeout) } } diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala index 794a390..969245f 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala @@ -1,7 +1,8 @@ package io.crashbox.ci +import IOUtil._ + object DockerUtil { - import IOUtil._ import com.spotify.docker.client.DockerClient import java.io.File import java.nio.file.Files diff --git a/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala index 034c6a3..82c0840 100644 --- a/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala +++ b/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala @@ -1,13 +1,13 @@ package io.crashbox.ci -import java.io.{ ByteArrayOutputStream, File } +import java.io.{ByteArrayOutputStream, File} import java.nio.file.Files object IOUtil { def withTempDir[A](action: File => A): A = { def rm(parent: File): Unit = if (parent.isDirectory) { - parent.listFiles.foreach{ child => + parent.listFiles.foreach { child => rm(child) } } @@ -16,13 +16,15 @@ object IOUtil { finally rm(dir) } - def withTempStream[A](action: ByteArrayOutputStream => A, size: Int = 1024): A = { + def withTempStream[A](action: ByteArrayOutputStream => A, + size: Int = 1024): A = { val out = new ByteArrayOutputStream(size) try action(out) finally out.close() } - def withTemp[A](action: (File, ByteArrayOutputStream) => A, size: Int = 1024): A = { + def withTemp[A](action: (File, ByteArrayOutputStream) => A, + size: Int = 1024): A = { withTempDir { d => withTempStream { s => action(d, s) -- cgit v1.2.3