1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
package io.crashbox.ci
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Keep
import akka.stream.{ ClosedShape, KillSwitch }
import akka.stream.scaladsl.{ GraphDSL, RunnableGraph, Sink, Source }
import akka.stream.{ ActorMaterializer, FanInShape2 }
import java.io.{ ByteArrayOutputStream, File, OutputStream }
import java.nio.file.Files
import org.scalatest._
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import Builder._
class BuildStageSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem("crashboxd-buildstage")
implicit val materializer = ActorMaterializer()
import system.dispatcher
case class DummyEnv() extends Environment {
val id = DummyId()
}
case class DummyId() extends ExecutionId {
import DummyId._
private var _state: State = Starting
def state = _state.synchronized{ _state}
def state_=(value: State) = _state.synchronized{_state = value}
}
object DummyId {
sealed trait State
case object Starting extends State
case object Running extends State
case object Result extends State
case object Stopped extends State
}
class DummyExecutor(
startDelay: Duration = 0.seconds,
resultDelay: Duration = 0.seconds,
stopDelay: Duration = 0.seconds
) extends Executor[DummyEnv, DummyId] {
override def start(env: DummyEnv, script: String, dir: File, out: OutputStream) = Future {
Thread.sleep(startDelay.toMillis)
env.id.state = DummyId.Running
env.id
}
override def result(id: DummyId) = Future {
Thread.sleep(resultDelay.toMillis)
id.state = DummyId.Result
0
}
override def stop(id: DummyId) = {
Thread.sleep(stopDelay.toMillis)
id.state = DummyId.Stopped
}
}
def dummySource(
executor: DummyExecutor,
env: DummyEnv
): Source[BuildState, NotUsed] = {
val stage = new BuildSource(
TaskId("dummy", 0),
TaskDef(env, ""),
executor,
new File("nonexistant"),
new ByteArrayOutputStream(0)
)
Source.fromGraph(stage)
}
"BuildStage" should "transition states and emit in the correct order" in {
val delay = 0.5.seconds
val executor = new DummyExecutor(delay, delay, delay)
val env = new DummyEnv()
val taskId = TaskId("dummy", 0)
val taskDef = TaskDef(env, "dummy script")
val stage = new BuildSource(
taskId,
taskDef,
executor,
new File("nonexistant"),
new ByteArrayOutputStream(0)
)
val source = Source.fromGraph(stage)
val eventFuture = source.toMat(Sink.seq)(Keep.right).run()
Thread.sleep((delay / 2).toMillis)
assert(env.id.state == DummyId.Starting)
Thread.sleep(delay.toMillis)
assert(env.id.state == DummyId.Running)
Thread.sleep(delay.toMillis)
assert(env.id.state == DummyId.Result)
Thread.sleep(delay.toMillis)
assert(env.id.state == DummyId.Stopped)
val expectedEvents = Seq(
TaskStarting(taskId, taskDef),
TaskRunning(taskId, env.id),
TaskFinished(taskId, 0)
)
val events = Await.result(eventFuture, 10.seconds)
assert(events.toList === expectedEvents.toList)
}
}
|