From c71d44cb57c5daddaa8e58cd1b559bb88bbd2a04 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 31 Aug 2014 21:52:06 -0300 Subject: + core: initial support for akka remoting/cluster, related to #61 --- .../src/main/scala/kamon/trace/TraceContext.scala | 54 ++++++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/trace/TraceContext.scala') diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 9ce3cd4e..6ea30511 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -16,6 +16,8 @@ package kamon.trace +import java.io.ObjectStreamException + import akka.actor.ActorSystem import kamon.Kamon import kamon.metric._ @@ -32,6 +34,9 @@ trait TraceContext { def levelOfDetail: TracingLevelOfDetail def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle def finish(metadata: Map[String, String]) + def origin: TraceContextOrigin + def startMilliTime: Long + def isOpen: Boolean private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage } @@ -51,7 +56,13 @@ case object OnlyMetrics extends TracingLevelOfDetail case object SimpleTrace extends TracingLevelOfDetail case object FullTrace extends TracingLevelOfDetail -trait TraceContextAware { +sealed trait TraceContextOrigin +object TraceContextOrigin { + case object Local extends TraceContextOrigin + case object Remote extends TraceContextOrigin +} + +trait TraceContextAware extends Serializable { def captureNanoTime: Long def traceContext: Option[TraceContext] } @@ -60,8 +71,20 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - val captureNanoTime = System.nanoTime() - val traceContext = TraceRecorder.currentContext + @transient val captureNanoTime = System.nanoTime() + @transient val traceContext = TraceRecorder.currentContext + + // + // Beware of this hack, it might bite us in the future! + // + // When using remoting/cluster all messages carry the TraceContext in the envelope in which they + // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is + // available (if any) when the system messages are read and this will make sure that it is correctly + // captured and propagated. + @throws[ObjectStreamException] + private def readResolve: AnyRef = { + new DefaultTraceContextAware + } } } @@ -75,11 +98,15 @@ object SegmentCompletionHandleAware { class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {} } -class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], - val system: ActorSystem) extends TraceContext { - @volatile private var _isOpen = true +class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String], + val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis, + izOpen: Boolean = true) extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + val levelOfDetail = OnlyMetrics - val startMark = System.nanoTime() + val startNanoTime = System.nanoTime() val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() val metricsExtension = Kamon(Metrics)(system) @@ -91,11 +118,20 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok def finish(metadata: Map[String, String]): Unit = { _isOpen = false - val finishMark = System.nanoTime() + + val elapsedNanoTime = + if (origin == TraceContextOrigin.Local) + // Everything is local, nanoTime is still the best resolution we can use. + System.nanoTime() - startNanoTime + else + // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds + // to be consistent with unit used for all latency measurements. + (System.currentTimeMillis() - startMilliTime) * 1000000L + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(finishMark - startMark) + traceMetrics.elapsedTime.record(elapsedNanoTime) drainFinishedSegments(traceMetrics) } } -- cgit v1.2.3