From 594c7a1729789eae7037918cde7287bdc4111b70 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 3 Dec 2014 02:10:46 +0100 Subject: = core: first simple approach to providing traces and a subscription mechanism. --- .../src/main/scala/kamon/trace/Incubator.scala | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/trace/Incubator.scala (limited to 'kamon-core/src/main/scala/kamon/trace/Incubator.scala') diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..d363d771 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,45 @@ +package kamon.trace + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor { + import context.dispatcher + val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces) + var incubating = Queue.empty[IncubatingTrace] + + def receive = { + case CheckForCompletedTraces ⇒ dispatchCompleted() + case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc)) + } + + @tailrec private def dispatchCompleted(): Unit = { + if (incubating.nonEmpty) { + val it = incubating.head + if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) { + it.tc.generateTraceInfo.map(subscriptions ! _) + incubating = incubating.tail + dispatchCompleted() + } + } + } + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext) { + private val incubationStartNanoTime = System.nanoTime() + def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime + } +} -- cgit v1.2.3