From 432fb45952c587bcebf81d718188e7067572cf49 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 4 Dec 2014 03:20:41 +0100 Subject: + core: cleanup the simple trace implementation --- .../src/main/scala/kamon/trace/Incubator.scala | 71 ++++++++++++++++------ 1 file changed, 53 insertions(+), 18 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/trace/Incubator.scala') diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala index d363d771..df51f411 100644 --- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -1,32 +1,70 @@ package kamon.trace -import akka.actor.{ Props, Actor, ActorRef } +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.{ NanoInterval, RelativeNanoTimestamp } import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration._ -class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { import context.dispatcher - val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) - var incubating = Queue.empty[IncubatingTrace] + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS)) + val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS)) + val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS) + + val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] def receive = { - case CheckForCompletedTraces ⇒ dispatchCompleted() - case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() } - @tailrec private def dispatchCompleted(): Unit = { - if (incubating.nonEmpty) { - val it = incubating.head - if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { - it.tc.generateTraceInfo.map(subscriptions ! _) - incubating = incubating.tail - dispatchCompleted() + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() } } } + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + override def postStop(): Unit = { super.postStop() checkSchedule.cancel() @@ -35,11 +73,8 @@ class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Ac object Incubator { - def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) case object CheckForCompletedTraces - case class IncubatingTrace(tc: TracingContext) { - private val incubationStartNanoTime = System.nanoTime() - def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime - } + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) } -- cgit v1.2.3