aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala')
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala94
1 files changed, 94 insertions, 0 deletions
diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala b/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala
new file mode 100644
index 00000000..fe5ad569
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala
@@ -0,0 +1,94 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import akka.actor.{ActorLogging, Props, Actor, ActorRef}
+import kamon.trace.Incubator.{CheckForCompletedTraces, IncubatingTrace}
+import kamon.util.{NanoInterval, RelativeNanoTimestamp}
+import scala.annotation.tailrec
+import scala.collection.immutable.Queue
+import kamon.util.ConfigTools.Syntax
+
+class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging {
+ import context.dispatcher
+ val config = context.system.settings.config.getConfig("kamon.trace.incubator")
+
+ val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos)
+ val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos)
+ val checkInterval = config.getFiniteDuration("check-interval")
+
+ val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces)
+ var waitingForMinimumIncubation = Queue.empty[IncubatingTrace]
+ var waitingForIncubationFinish = List.empty[IncubatingTrace]
+
+ def receive = {
+ case tc: TracingContext ⇒ incubate(tc)
+ case CheckForCompletedTraces ⇒
+ checkWaitingForMinimumIncubation()
+ checkWaitingForIncubationFinish()
+ }
+
+ def incubate(tc: TracingContext): Unit =
+ waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now))
+
+ @tailrec private def checkWaitingForMinimumIncubation(): Unit = {
+ if (waitingForMinimumIncubation.nonEmpty) {
+ val it = waitingForMinimumIncubation.head
+ if (NanoInterval.since(it.incubationStart) >= minIncubationTime) {
+ waitingForMinimumIncubation = waitingForMinimumIncubation.tail
+
+ if (it.tc.shouldIncubate)
+ waitingForIncubationFinish = it :: waitingForIncubationFinish
+ else
+ dispatchTraceInfo(it.tc)
+
+ checkWaitingForMinimumIncubation()
+ }
+ }
+ }
+
+ private def checkWaitingForIncubationFinish(): Unit = {
+ waitingForIncubationFinish = waitingForIncubationFinish.filter {
+ case IncubatingTrace(context, incubationStart) ⇒
+ if (!context.shouldIncubate) {
+ dispatchTraceInfo(context)
+ false
+ } else {
+ if (NanoInterval.since(incubationStart) >= maxIncubationTime) {
+ log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token)
+ dispatchTraceInfo(context);
+ false
+ } else true
+ }
+ }
+ }
+
+ def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo
+
+ override def postStop(): Unit = {
+ super.postStop()
+ checkSchedule.cancel()
+ }
+}
+
+object Incubator {
+
+ def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions))
+
+ case object CheckForCompletedTraces
+ case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp)
+}