aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/Incubator.scala
blob: d363d77130fa670095cbbee38f3decc8b65ca6ab (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package kamon.trace

import akka.actor.{ Props, Actor, ActorRef }
import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration._

class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor {
  import context.dispatcher
  val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces)
  var incubating = Queue.empty[IncubatingTrace]

  def receive = {
    case CheckForCompletedTraces  dispatchCompleted()
    case tc: TracingContext       incubating = incubating.enqueue(IncubatingTrace(tc))
  }

  @tailrec private def dispatchCompleted(): Unit = {
    if (incubating.nonEmpty) {
      val it = incubating.head
      if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) {
        it.tc.generateTraceInfo.map(subscriptions ! _)
        incubating = incubating.tail
        dispatchCompleted()
      }
    }
  }

  override def postStop(): Unit = {
    super.postStop()
    checkSchedule.cancel()
  }
}

object Incubator {

  def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime))

  case object CheckForCompletedTraces
  case class IncubatingTrace(tc: TracingContext) {
    private val incubationStartNanoTime = System.nanoTime()
    def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime
  }
}