summaryrefslogtreecommitdiff
path: root/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
diff options
context:
space:
mode:
Diffstat (limited to 'crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala')
-rw-r--r--crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala68
1 files changed, 42 insertions, 26 deletions
diff --git a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
index c860932..e9f2a82 100644
--- a/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
+++ b/crashbox-server/src/main/scala/io/crashbox/ci/Schedulers.scala
@@ -1,24 +1,31 @@
package io.crashbox.ci
-import akka.actor.{ Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated }
+import akka.actor.{
+ Actor,
+ ActorLogging,
+ ActorRef,
+ OneForOneStrategy,
+ Props,
+ Terminated
+}
import akka.stream.stage.GraphStageLogic
-import akka.stream.{ Attributes, Outlet, SourceShape }
+import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.GraphStage
-import java.io.{ File, OutputStream }
+import java.io.{File, OutputStream}
import java.net.URL
import java.nio.file.Files
import java.util.Base64
import scala.collection.mutable.HashMap
-import scala.concurrent.{ Await, Future }
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor.SupervisorStrategy._
-import scala.util.{ Failure, Success }
-
+import scala.util.{Failure, Success}
trait Schedulers extends { self: Core with Source with Builders with Parsers =>
- private def newTempDir: File = Files.createTempDirectory("crashbox-run").toFile()
+ private def newTempDir: File =
+ Files.createTempDirectory("crashbox-run").toFile()
sealed trait BuildState
case class Cloning(url: URL) extends BuildState
@@ -31,10 +38,11 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
case class Failed(message: String) extends EndBuildState
class BuildManager(
- url: URL,
- openOut: () => OutputStream,
- update: BuildState => Unit
- ) extends Actor with ActorLogging {
+ url: URL,
+ openOut: () => OutputStream,
+ update: BuildState => Unit
+ ) extends Actor
+ with ActorLogging {
var buildDir: Option[File] = None
var out: Option[OutputStream] = None
@@ -53,7 +61,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
override def receive: Receive = {
- case state@Cloning(url) =>
+ case state @ Cloning(url) =>
log.debug("Update build state: cloning")
update(state)
fetchSource(url, newTempDir) onComplete {
@@ -63,7 +71,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
self ! Failed(s"Error fetching source from $url")
}
- case state@Parsing(src) =>
+ case state @ Parsing(src) =>
log.debug("Update build state: parsing")
update(state)
buildDir = Some(src)
@@ -74,7 +82,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
self ! Failed(s"Failed to parse build $err")
}
- case state@Starting(src, bd) =>
+ case state @ Starting(src, bd) =>
log.debug("Update build state: starting")
update(state)
val so = openOut()
@@ -86,7 +94,7 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
self ! Failed(s"Failed to start build $err")
}
- case state@Running(id) =>
+ case state @ Running(id) =>
log.debug("Update build state: running")
update(state)
containerId = Some(id)
@@ -97,25 +105,31 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
self ! Failed(s"Error waiting for build to complete")
}
- case state@Finished(status) =>
+ case state @ Finished(status) =>
log.debug("Update build state: finished")
update(state)
context stop self
- case state@Failed(message) =>
+ case state @ Failed(message) =>
log.debug("Update build state: failed")
update(state)
context stop self
}
}
object BuildManager {
- def apply(buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit) =
+ def apply(buildId: String,
+ url: URL,
+ out: () => OutputStream,
+ update: BuildState => Unit) =
Props(new BuildManager(url, out, update))
}
private sealed trait SchedulerCommand
private case class ScheduleBuild(
- buildId: String, url: URL, out: () => OutputStream, update: BuildState => Unit
+ buildId: String,
+ url: URL,
+ out: () => OutputStream,
+ update: BuildState => Unit
) extends SchedulerCommand
class Scheduler extends Actor {
@@ -128,8 +142,9 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
runningBuilds.get(sb.buildId) match {
case Some(_) => //already running
case None =>
- val buildManager = context.actorOf(BuildManager(
- sb.buildId, sb.url, sb.out, sb.update), s"build-${sb.buildId}")
+ val buildManager = context.actorOf(
+ BuildManager(sb.buildId, sb.url, sb.out, sb.update),
+ s"build-${sb.buildId}")
context watch buildManager
runningBuilds += sb.buildId -> buildManager
}
@@ -142,13 +157,14 @@ trait Schedulers extends { self: Core with Source with Builders with Parsers =>
}
}
- private val scheduler = system.actorOf(Props(new Scheduler()), "crashbox-scheduler")
+ private val scheduler =
+ system.actorOf(Props(new Scheduler()), "crashbox-scheduler")
def start(
- buildId: String,
- url: URL,
- out: () => OutputStream,
- update: BuildState => Unit
+ buildId: String,
+ url: URL,
+ out: () => OutputStream,
+ update: BuildState => Unit
): Unit = {
scheduler ! ScheduleBuild(buildId, url, out, update)
}