blob: d363d77130fa670095cbbee38f3decc8b65ca6ab (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package kamon.trace
import akka.actor.{ Props, Actor, ActorRef }
import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration._
class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor {
import context.dispatcher
val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces)
var incubating = Queue.empty[IncubatingTrace]
def receive = {
case CheckForCompletedTraces ⇒ dispatchCompleted()
case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc))
}
@tailrec private def dispatchCompleted(): Unit = {
if (incubating.nonEmpty) {
val it = incubating.head
if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) {
it.tc.generateTraceInfo.map(subscriptions ! _)
incubating = incubating.tail
dispatchCompleted()
}
}
}
override def postStop(): Unit = {
super.postStop()
checkSchedule.cancel()
}
}
object Incubator {
def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime))
case object CheckForCompletedTraces
case class IncubatingTrace(tc: TracingContext) {
private val incubationStartNanoTime = System.nanoTime()
def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime
}
}
|