aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/Incubator.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-12-04 03:20:41 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-12-04 03:20:41 +0100
commit432fb45952c587bcebf81d718188e7067572cf49 (patch)
tree0aad1362df0af3a8b14a246edb609e2eb621d28f /kamon-core/src/main/scala/kamon/trace/Incubator.scala
parent46d823ec5ab0265edacf7f704ad0e0c8a61609d1 (diff)
downloadKamon-432fb45952c587bcebf81d718188e7067572cf49.tar.gz
Kamon-432fb45952c587bcebf81d718188e7067572cf49.tar.bz2
Kamon-432fb45952c587bcebf81d718188e7067572cf49.zip
+ core: cleanup the simple trace implementation
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Incubator.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala71
1 files changed, 53 insertions, 18 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
index d363d771..df51f411 100644
--- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -1,32 +1,70 @@
package kamon.trace
-import akka.actor.{ Props, Actor, ActorRef }
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ ActorLogging, Props, Actor, ActorRef }
+import kamon.{ NanoInterval, RelativeNanoTimestamp }
import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration._
-class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor {
+class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
- val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces)
- var incubating = Queue.empty[IncubatingTrace]
+ val config = context.system.settings.config.getConfig("kamon.trace.incubator")
+
+ val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS))
+ val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS))
+ val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS)
+
+ val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces)
+ var waitingForMinimumIncubation = Queue.empty[IncubatingTrace]
+ var waitingForIncubationFinish = List.empty[IncubatingTrace]
def receive = {
- case CheckForCompletedTraces ⇒ dispatchCompleted()
- case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc))
+ case tc: TracingContext ⇒ incubate(tc)
+ case CheckForCompletedTraces ⇒
+ checkWaitingForMinimumIncubation()
+ checkWaitingForIncubationFinish()
}
- @tailrec private def dispatchCompleted(): Unit = {
- if (incubating.nonEmpty) {
- val it = incubating.head
- if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) {
- it.tc.generateTraceInfo.map(subscriptions ! _)
- incubating = incubating.tail
- dispatchCompleted()
+ def incubate(tc: TracingContext): Unit =
+ waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now))
+
+ @tailrec private def checkWaitingForMinimumIncubation(): Unit = {
+ if (waitingForMinimumIncubation.nonEmpty) {
+ val it = waitingForMinimumIncubation.head
+ if (NanoInterval.since(it.incubationStart) >= minIncubationTime) {
+ waitingForMinimumIncubation = waitingForMinimumIncubation.tail
+
+ if (it.tc.shouldIncubate)
+ waitingForIncubationFinish = it :: waitingForIncubationFinish
+ else
+ dispatchTraceInfo(it.tc)
+
+ checkWaitingForMinimumIncubation()
}
}
}
+ private def checkWaitingForIncubationFinish(): Unit = {
+ waitingForIncubationFinish = waitingForIncubationFinish.filter {
+ case IncubatingTrace(context, incubationStart) ⇒
+ if (!context.shouldIncubate) {
+ dispatchTraceInfo(context)
+ false
+ } else {
+ if (NanoInterval.since(incubationStart) >= maxIncubationTime) {
+ log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token)
+ dispatchTraceInfo(context);
+ false
+ } else true
+ }
+ }
+ }
+
+ def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo
+
override def postStop(): Unit = {
super.postStop()
checkSchedule.cancel()
@@ -35,11 +73,8 @@ class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Ac
object Incubator {
- def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime))
+ def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions))
case object CheckForCompletedTraces
- case class IncubatingTrace(tc: TracingContext) {
- private val incubationStartNanoTime = System.nanoTime()
- def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime
- }
+ case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp)
}