summaryrefslogblamecommitdiff
path: root/crashboxd/src/main/scala/io/crashbox/ci/Builder.scala
blob: f77ce0aaa4389d816405a0fd6569c5e5800ea770 (plain) (tree)



























































































































































































































                                                                                          
package io.crashbox.ci

import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler, StageLogging }
import akka.stream.{ Attributes, FanInShape2, Inlet, Outlet }
import akka.stream.stage.{ GraphStage }
import io.crashbox.ci.DockerExecutor.ExecutionId
import java.io.{ File, OutputStream }
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{ HashMap, Queue }
import scala.concurrent.Future
import scala.util.{ Failure, Success }


case class BuildId(id: String) extends AnyVal

case class TaskId(buildId: String, taskIdx: Int) {
  override def toString = s"$buildId#$taskIdx"
}

class BuildStage(
  executor: DockerExecutor,
  mkdir: TaskId => File,
  mkout: TaskId => OutputStream
) extends GraphStage[FanInShape2[(TaskId, TaskDef), TaskId, Builder.BuildState]] {
  import Builder._

  val submissions = Inlet[(TaskId, TaskDef)]("Builder.submissions")
  val cancellations = Inlet[TaskId]("Builder.cancellations")
  val states = Outlet[BuildState]("Builder.states")

  val shape = new FanInShape2(submissions, cancellations, states)

  def createLogic(attributes: Attributes) = new GraphStageLogic(shape) with StageLogging {
    implicit def ec = materializer.executionContext
    //import scala.concurrent.ExecutionContext.Implicits.global

    var runningTaskId: Option[TaskId] = None
    var runningExecutionId: Future[ExecutionId] =
      Future.failed(new RuntimeException("not started"))

    override def preStart(): Unit = {
      log.info("Starting build stage")
      pull(cancellations)
      pull(submissions)
    }

    setHandler(cancellations, new InHandler {

      override def onPush(): Unit = {
        val tid = grab(cancellations)
        if (runningTaskId == tid) {
          runningExecutionId.foreach { eid =>
            executor.stop(eid)
          }
        }
        pull(cancellations)
      }

      // don't finish stage when cancellations closes
      override def onUpstreamFinish() = {}

    })

    setHandler(submissions, new InHandler {

      val changeState = getAsyncCallback[BuildState]{state =>
        log.info(s"${state.taskId} transitioned to $state")
        emit(states, state)
      }
      val pullNext = getAsyncCallback[Unit]{ _ =>
        if (isAvailable(states)) {
          pull(submissions)
        }
      }

      override def onPush(): Unit = {
        val (tid, tdef) = grab(submissions)
        log.info(s"$tid new submission $tdef")

        val image = tdef.environment match {
          case DockerEnvironment(image) => image
          case env => sys.error("Unsupported environemnt")
        }
        
        changeState.invoke(TaskStarting(tid, tdef))
        runningTaskId = Some(tid)
        runningExecutionId = executor.start(
          image,
          tdef.script,
          mkdir(tid),
          mkout(tid)
        ).map{ eid =>
          changeState.invoke(TaskRunning(tid, eid))
          eid
        }
        
        runningExecutionId.flatMap { eid =>
          executor.result(eid)
        } onComplete { result =>
          result match {
            case Success(status) =>
              changeState.invoke(TaskFinished(tid, status))
            case Failure(err) =>
              changeState.invoke(TaskFailed(tid, err.toString))
          }
          pullNext.invoke(())
        }
      }

      override def onUpstreamFinish() = {
        log.info(s"${submissions} finished, completing stage after final build")
        val callback = getAsyncCallback[Unit]{_ =>
          completeStage()
        }
        runningExecutionId.onComplete { _ => callback.invoke(()) }
      }

    })

    setHandler(states, GraphStageLogic.IgnoreTerminateOutput)

    override def postStop() = {
      log.info("postStop")
      runningExecutionId foreach { eid =>
        log.error("kill")
        executor.stop(eid)
      }
    }

  }
}

// class Builder(
//   listener: ActorRef,
//   exec: DockerExecutor,
//   mkdir: (BuildId, Int) => File,
//   mkout: (BuildId, Int) => OutputStream
// ) extends Actor with ActorLogging {
//   import context._
//   import Builder._

//   val queue = Queue.empty[(BuildId, Int, BuildDef)]
//   val running = TrieMap.empty[(BuildId, Int), ExecutionId]

//   val Parallelism = 2
//   var slots = Parallelism

//   def receive = {

//     case Cancel(bid) =>
//       running.find(_._1._1 == bid).foreach { case (_, eid)  =>
//         // stopping will cause the pipeline to fail an thus remove itself from
//         // the running map
//         exec.stop(eid)
//       }

//     case Done =>
//       slots += 1
//       listener ! Next

//     case Next if slots > 0 =>
//       slots -= 1
//       val (bid, tid, bdf) = queue.dequeue()
//       val tdf = bdf.tasks(tid)
//       val image = tdf.environment match {
//         case DockerEnvironment(image) => image
//         case env => {log.error(s"can't run $env"); ???}
//       }

//       listener ! TaskStarting(bid, tid, tdf)

//       val pipeline = for (
//         eid <- exec.start(image, tdf.script, mkdir(bid, tid), mkout(bid, tid));
//         _ = listener ! TaskRunning(bid, tid, eid);
//         _ = running += (bid, tid) -> eid;
//         status <- exec.result(eid)
//       ) yield {
//         status
//       }
//       pipeline onComplete { res =>
//         running -= ((bid, tid))
//         self ! Done
//         res match {
//           case Success(status) => listener ! TaskFinished(bid, tid, status)
//           case Failure(err) => listener ! TaskFailed(bid, tid, err.toString)
//         }
//       }

//     case Next => // no slots available, do nothing

//   }

//   override def postStop() = {
//     for ((_, eid) <- running) {
//       exec.stop(eid)
//     }
//   }

// }

object Builder {

  //case class Submit(id: BuildId, build: BuildDef)
  //case class Cancel(id: BuildId)

  private case object Next
  private case object Done

  sealed trait BuildState {
    def taskId: TaskId
  }
  case class TaskStarting(taskId: TaskId, taskDef: TaskDef) extends BuildState
  case class TaskRunning(taskId: TaskId, execId: ExecutionId) extends BuildState

  case class TaskFinished(taskId: TaskId, status: Int) extends BuildState
  case class TaskFailed(taskId: TaskId, message: String) extends BuildState

}