summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-04-09 18:18:22 -0700
committerJakob Odersky <jakob@odersky.com>2017-04-09 18:18:22 -0700
commita7492a3b101ed0e119bf3315ff4dfb63a8f3fe01 (patch)
tree14b07d84ff8f5bb03fc909cd550d6c726a27be5c
parent30c3990d25293c01a380fc8715679e7f618a4cae (diff)
downloadcrashbox-ci-a7492a3b101ed0e119bf3315ff4dfb63a8f3fe01.tar.gz
crashbox-ci-a7492a3b101ed0e119bf3315ff4dfb63a8f3fe01.tar.bz2
crashbox-ci-a7492a3b101ed0e119bf3315ff4dfb63a8f3fe01.zip
FormattingHEADmaster
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Builder.scala107
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala5
-rw-r--r--crashboxd/src/main/scala/io/crashbox/ci/Executor.scala11
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala40
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala38
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala3
-rw-r--r--crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala10
7 files changed, 106 insertions, 108 deletions
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
index 77998cd..8ac7864 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
@@ -1,24 +1,19 @@
package io.crashbox.ci
-import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
-import akka.stream.SourceShape
-import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
-import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
-import akka.stream.stage.{ GraphStage }
-import java.io.{ File, OutputStream }
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.{ HashMap, Queue }
+import java.io.{File, OutputStream}
+
import scala.concurrent.Future
-import scala.util.{ Failure, Success }
+import scala.util.{Failure, Success}
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.stage.{GraphStage, GraphStageLogic, StageLogging}
class BuildSource[Env <: Environment, Id <: ExecutionId](
- taskId: TaskId,
- taskDef: TaskDef[Env],
- executor: Executor[Env, Id],
- mkdir: => File,
- mkout: => OutputStream // TODO refactor this into a two-output stage
+ taskId: TaskId,
+ taskDef: TaskDef[Env],
+ executor: Executor[Env, Id],
+ mkdir: => File,
+ mkout: => OutputStream // TODO refactor this into a two-output stage
) extends GraphStage[SourceShape[Builder.BuildState]] {
import Builder._
@@ -26,52 +21,54 @@ class BuildSource[Env <: Environment, Id <: ExecutionId](
val shape = new SourceShape(states)
- def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
- implicit def ec = materializer.executionContext
+ def createLogic(attributes: Attributes) =
+ new GraphStageLogic(shape) with StageLogging {
+ implicit def ec = materializer.executionContext
- lazy val instance: Future[Id] = executor.start(
- taskDef.environment,
- taskDef.script,
- mkdir,
- mkout
- )
+ lazy val instance: Future[Id] = executor.start(
+ taskDef.environment,
+ taskDef.script,
+ mkdir,
+ mkout
+ )
- val changeState = getAsyncCallback[BuildState]{state =>
- log.info(s"${state.taskId} transitioned to $state")
- emit(states, state)
- }
+ val changeState = getAsyncCallback[BuildState] { state =>
+ log.info(s"${state.taskId} transitioned to $state")
+ emit(states, state)
+ }
- val asyncClose = getAsyncCallback[Unit]{_ =>
- completeStage()
- }
+ val asyncClose = getAsyncCallback[Unit] { _ =>
+ completeStage()
+ }
- override def preStart() = {
- val pipeline = for (
- _ <- Future.unit;
- _ = changeState.invoke(TaskStarting(taskId, taskDef));
- eid <- instance;
- _ = changeState.invoke(TaskRunning(taskId, eid));
- status <- executor.result(eid)
- ) yield status
-
- pipeline onComplete { result =>
- result match {
- case Success(status) =>
- changeState.invoke(TaskFinished(taskId, status))
- case Failure(err) =>
- changeState.invoke(TaskFailed(taskId, err.toString))
+ override def preStart() = {
+ val pipeline = for (_ <- Future.unit;
+ _ = changeState.invoke(
+ TaskStarting(taskId, taskDef));
+ eid <- instance;
+ _ = changeState.invoke(TaskRunning(taskId, eid));
+ status <- executor.result(eid)) yield status
+
+ pipeline onComplete { result =>
+ result match {
+ case Success(status) =>
+ changeState.invoke(TaskFinished(taskId, status))
+ case Failure(err) =>
+ changeState.invoke(TaskFailed(taskId, err.toString))
+ }
+ asyncClose.invoke(())
}
- asyncClose.invoke(())
}
- }
- override def postStop() = {
- instance.foreach{ eid => executor.stop(eid)}
- }
+ override def postStop() = {
+ instance.foreach { eid =>
+ executor.stop(eid)
+ }
+ }
- setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
+ setHandler(states, GraphStageLogic.IgnoreTerminateOutput)
- }
+ }
}
object Builder {
@@ -79,8 +76,10 @@ object Builder {
sealed trait BuildState {
def taskId: TaskId
}
- case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment]) extends BuildState
- case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState
+ case class TaskStarting(taskId: TaskId, taskDef: TaskDef[Environment])
+ extends BuildState
+ case class TaskRunning(taskId: TaskId, execId: ExecutionId)
+ extends BuildState
case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
case class TaskFailed(taskId: TaskId, message: String) extends BuildState
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
index 25a7f2e..d7678e5 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/DockerExecutor.scala
@@ -13,13 +13,11 @@ import com.spotify.docker.client.DockerClient.{
AttachParameter,
ListContainersParam
}
-import scala.util.Random
import com.spotify.docker.client.LogStream
import com.spotify.docker.client.exceptions.ContainerNotFoundException
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.messages.HostConfig.Bind
-
object DockerExecutor {
def containerUser = "crashbox"
@@ -29,7 +27,8 @@ object DockerExecutor {
}
class DockerExecutor(uri: String = "unix:///run/docker.sock")(
- implicit system: ActorSystem) extends Executor[DockerEnvironment, DockerExecutionId] {
+ implicit system: ActorSystem)
+ extends Executor[DockerEnvironment, DockerExecutionId] {
import DockerExecutor._
import system.log
diff --git a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
index 92a0fc7..51274e8 100644
--- a/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
+++ b/crashboxd/src/main/scala/io/crashbox/ci/Executor.scala
@@ -1,15 +1,16 @@
package io.crashbox.ci
-import java.io.{ File, OutputStream }
+import java.io.{File, OutputStream}
+
import scala.concurrent.Future
trait Executor[Env <: Environment, Id <: ExecutionId] {
def start(
- environment: Env,
- script: String,
- buildDirectory: File,
- out: OutputStream
+ environment: Env,
+ script: String,
+ buildDirectory: File,
+ out: OutputStream
): Future[Id]
def result(id: Id): Future[Int]
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
index dd79f72..033fe52 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/BuildStageSpec.scala
@@ -1,18 +1,16 @@
package io.crashbox.ci
+import java.io.{ByteArrayOutputStream, File, OutputStream}
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+import Builder._
import akka.NotUsed
import akka.actor.ActorSystem
-import akka.stream.scaladsl.Keep
-import akka.stream.{ ClosedShape, KillSwitch }
-import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source }
-import akka.stream.{ ActorMaterializer, FanInShape2 }
-import java.io.{ ByteArrayOutputStream, File, OutputStream }
-import java.nio.file.Files
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source}
import org.scalatest._
-import scala.concurrent.{ Await, Future }
-import scala.concurrent.duration._
-import Builder._
-
class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
@@ -23,13 +21,13 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
case class DummyEnv() extends Environment {
val id = DummyId()
}
-
+
case class DummyId() extends ExecutionId {
import DummyId._
private var _state: State = Starting
- def state = _state.synchronized{ _state}
- def state_=(value: State) = _state.synchronized{_state = value}
+ def state = _state.synchronized { _state }
+ def state_=(value: State) = _state.synchronized { _state = value }
}
object DummyId {
@@ -41,12 +39,15 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
}
class DummyExecutor(
- startDelay: Duration = 0.seconds,
- resultDelay: Duration = 0.seconds,
- stopDelay: Duration = 0.seconds
+ startDelay: Duration = 0.seconds,
+ resultDelay: Duration = 0.seconds,
+ stopDelay: Duration = 0.seconds
) extends Executor[DummyEnv, DummyId] {
- override def start(env: DummyEnv, script: String, dir: File, out: OutputStream) = Future {
+ override def start(env: DummyEnv,
+ script: String,
+ dir: File,
+ out: OutputStream) = Future {
Thread.sleep(startDelay.toMillis)
env.id.state = DummyId.Running
env.id
@@ -66,8 +67,8 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
}
def dummySource(
- executor: DummyExecutor,
- env: DummyEnv
+ executor: DummyExecutor,
+ env: DummyEnv
): Source[BuildState, NotUsed] = {
val stage = new BuildSource(
TaskId("dummy", 0),
@@ -109,7 +110,6 @@ class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
Thread.sleep(delay.toMillis)
assert(env.id.state == DummyId.Stopped)
-
val expectedEvents = Seq(
TaskStarting(taskId, taskDef),
TaskRunning(taskId, env.id),
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
index 6794908..7d4dc79 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerExecutorSpec.scala
@@ -1,8 +1,6 @@
package io.crashbox.ci
-import com.spotify.docker.client.DockerClient
-import com.spotify.docker.client.DockerClient.ListContainersParam
-import java.io.{ByteArrayOutputStream, File}
+import java.io.File
import java.nio.file.Files
import scala.collection.JavaConverters._
@@ -11,8 +9,6 @@ import scala.concurrent.duration._
import akka.actor.ActorSystem
import org.scalatest._
-import scala.util.Random
-
class DockerExecutorSpec
extends FlatSpec
@@ -45,13 +41,12 @@ class DockerExecutorSpec
def run[A](script: String)(tests: (Int, File, String) => A): A = withTemp {
case (dir, out) =>
-
- val awaitable = for (id <- exec.start(env, script, dir, out);
- status <- exec.result(id)) yield {
- status
- }
- val status = Await.result(awaitable, timeout)
- tests(status, dir, new String(out.toByteArray()).trim())
+ val awaitable = for (id <- exec.start(env, script, dir, out);
+ status <- exec.result(id)) yield {
+ status
+ }
+ val status = Await.result(awaitable, timeout)
+ tests(status, dir, new String(out.toByteArray()).trim())
}
"DockerExecutor" should "return expected exit codes" in {
@@ -97,15 +92,16 @@ class DockerExecutorSpec
}
it should "allow cancellations" in {
- withTemp { case (dir, out) =>
- val script = "while true; do sleep 1; echo sleeping; done"
-
- val id = Await.result(exec.start(env, script, dir, out), timeout)
- val check = exec.result(id).map { res =>
- assert(res == 137)
- }
- exec.stop(id)
- Await.result(check, timeout)
+ withTemp {
+ case (dir, out) =>
+ val script = "while true; do sleep 1; echo sleeping; done"
+
+ val id = Await.result(exec.start(env, script, dir, out), timeout)
+ val check = exec.result(id).map { res =>
+ assert(res == 137)
+ }
+ exec.stop(id)
+ Await.result(check, timeout)
}
}
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala
index 794a390..969245f 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/DockerUtil.scala
@@ -1,7 +1,8 @@
package io.crashbox.ci
+import IOUtil._
+
object DockerUtil {
- import IOUtil._
import com.spotify.docker.client.DockerClient
import java.io.File
import java.nio.file.Files
diff --git a/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala b/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala
index 034c6a3..82c0840 100644
--- a/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala
+++ b/crashboxd/src/test/scala/io/crashbox/ci/IOUtil.scala
@@ -1,13 +1,13 @@
package io.crashbox.ci
-import java.io.{ ByteArrayOutputStream, File }
+import java.io.{ByteArrayOutputStream, File}
import java.nio.file.Files
object IOUtil {
def withTempDir[A](action: File => A): A = {
def rm(parent: File): Unit = if (parent.isDirectory) {
- parent.listFiles.foreach{ child =>
+ parent.listFiles.foreach { child =>
rm(child)
}
}
@@ -16,13 +16,15 @@ object IOUtil {
finally rm(dir)
}
- def withTempStream[A](action: ByteArrayOutputStream => A, size: Int = 1024): A = {
+ def withTempStream[A](action: ByteArrayOutputStream => A,
+ size: Int = 1024): A = {
val out = new ByteArrayOutputStream(size)
try action(out)
finally out.close()
}
- def withTemp[A](action: (File, ByteArrayOutputStream) => A, size: Int = 1024): A = {
+ def withTemp[A](action: (File, ByteArrayOutputStream) => A,
+ size: Int = 1024): A = {
withTempDir { d =>
withTempStream { s =>
action(d, s)