aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-01-31 09:01:18 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-01-31 09:01:18 -0300
commit59c01d880379dfc48c6d82da13ef628a587a9bbb (patch)
treedd323caa93133a98da5f76be332dfdbf76280ea5
parenta15e17d2462105ad8b72054be58dc9e8f9dc64ed (diff)
downloadKamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.tar.gz
Kamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.tar.bz2
Kamon-59c01d880379dfc48c6d82da13ef628a587a9bbb.zip
remake of trace context and allow different tracing levels
-rw-r--r--kamon-core/src/main/resources/reference.conf4
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala4
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Segments.scala39
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Trace.scala123
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala137
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceCtx.scala125
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala69
-rw-r--r--kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala2
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala12
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala35
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala58
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala26
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala38
-rw-r--r--kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala10
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala4
-rw-r--r--kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala4
-rw-r--r--kamon-playground/src/main/resources/application.conf8
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala32
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/Spray.scala37
-rw-r--r--kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala5
-rw-r--r--kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala50
-rw-r--r--kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala37
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala5
-rw-r--r--kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala2
28 files changed, 402 insertions, 486 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index e6124365..a6dd6c15 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -11,7 +11,7 @@ kamon {
},
{
trace {
- includes = [ "**" ]
+ includes = [ "*" ]
excludes = []
}
},
@@ -46,7 +46,7 @@ kamon {
highest-trackable-value = 3600000000000
significant-value-digits = 2
}
- segments {
+ segment {
highest-trackable-value = 3600000000000
significant-value-digits = 2
}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 98700974..199b2bb2 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -49,7 +49,7 @@ class BehaviourInvokeTracing {
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
- TraceRecorder.withContext(contextAndTimestamp.traceContext) {
+ TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) {
pjp.proceed()
}
@@ -73,7 +73,7 @@ class BehaviourInvokeTracing {
class EnvelopeTraceContextMixin {
@DeclareMixin("akka.dispatch.Envelope")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
index d4f8f769..7d03d946 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
@@ -3,13 +3,13 @@ package akka.instrumentation
import org.aspectj.lang.annotation._
import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
import org.aspectj.lang.ProceedingJoinPoint
-import kamon.trace.{TraceRecorder, TraceContextAware}
+import kamon.trace.{ TraceRecorder, TraceContextAware }
@Aspect
class SystemMessageTraceContextMixin {
@DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
@@ -25,7 +25,7 @@ class SystemMessageTraceContextMixin {
class RepointableActorRefTraceContextMixin {
@DeclareMixin("akka.actor.RepointableActorRef")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default
@Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
def envelopeCreation(ctx: TraceContextAware): Unit = {}
@@ -41,7 +41,7 @@ class RepointableActorRefTraceContextMixin {
@Around("repointableActorRefCreation(repointableActorRef)")
def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = {
- TraceRecorder.withContext(repointableActorRef.traceContext) {
+ TraceRecorder.withTraceContext(repointableActorRef.traceContext) {
pjp.proceed()
}
}
@@ -58,7 +58,7 @@ class ActorSystemMessagePassingTracing {
def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
if (messages.nonEmpty) {
val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext
- TraceRecorder.withContext(ctx)(pjp.proceed())
+ TraceRecorder.withTraceContext(ctx)(pjp.proceed())
} else pjp.proceed()
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
index abd3514e..954f351a 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala
@@ -23,14 +23,14 @@ import kamon.trace.{ TraceContextAware, TraceRecorder }
class ActorLoggingTracing {
@DeclareMixin("akka.event.Logging.LogEvent+")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default
@Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {}
@Around("withMdcInvocation(logSource, logEvent, logStatement)")
def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {
- TraceRecorder.withContext(logEvent.traceContext) {
+ TraceRecorder.withTraceContext(logEvent.traceContext) {
pjp.proceed()
}
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
index b8725dd7..634c94a1 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala
@@ -23,7 +23,7 @@ import kamon.trace.{ TraceContextAware, TraceRecorder }
class FutureTracing {
@DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def mixin: TraceContextAware = new TraceContextAware {}
+ def mixinTraceContextAwareToFutureRelatedRunnable: TraceContextAware = TraceContextAware.default
@Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)")
def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {}
@@ -39,7 +39,7 @@ class FutureTracing {
@Around("futureRelatedRunnableExecution(runnable)")
def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = {
- TraceRecorder.withContext(runnable.traceContext) {
+ TraceRecorder.withTraceContext(runnable.traceContext) {
pjp.proceed()
}
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
index 25ebce00..57a79653 100644
--- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala
@@ -57,9 +57,7 @@ object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory
val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time"))
val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment"))
- new TraceMetricRecorder(
- HighDynamicRangeRecorder(elapsedTimeHdrConfig),
- () ⇒ HighDynamicRangeRecorder(segmentHdrConfig))
+ new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig))
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala
deleted file mode 100644
index e6d9745b..00000000
--- a/kamon-core/src/main/scala/kamon/trace/Segments.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-
-package kamon.trace
-
-
-import kamon.trace.TraceOld.SegmentCompletionHandle
-
-object Segments {
-
- trait Category
- case object HttpClientRequest extends Category
-
- case class Start(category: Category, description: String = "",
- attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
-
- case class End(attributes: Map[String, String] = Map(), timestamp: Long = System.nanoTime())
-
- case class Segment(start: Start, end: End)
-
- trait SegmentCompletionHandleAware {
- var completionHandle: Option[SegmentCompletionHandle]
- }
-
- trait ContextAndSegmentCompletionAware extends TraceContextAware with SegmentCompletionHandleAware
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala
deleted file mode 100644
index bdfd6aa3..00000000
--- a/kamon-core/src/main/scala/kamon/trace/Trace.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/* ===================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
-package kamon.trace
-
-import kamon.Kamon
-import akka.actor._
-import scala.Some
-import kamon.trace.TraceOld.Register
-import scala.concurrent.duration._
-import java.util.concurrent.atomic.AtomicLong
-import scala.util.Try
-import java.net.InetAddress
-
-object TraceOld extends ExtensionId[TraceExtension] with ExtensionIdProvider {
- def lookup(): ExtensionId[_ <: Extension] = TraceOld
- def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
-
- /*** Protocol */
- case object Register
-
- /** User API */
- //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None)
- private[trace] val traceContext = new ThreadLocal[Option[TraceContextOld]] {
- override def initialValue(): Option[TraceContextOld] = None
- }
- private[trace] val tranid = new AtomicLong()
-
- def context() = traceContext.get
- private def set(ctx: Option[TraceContextOld]) = traceContext.set(ctx)
-
- def clear: Unit = traceContext.remove()
- def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContextOld = {
- val ctx = newTraceContext(name, token.getOrElse(TraceToken.generate()))
- ctx.start(name)
- set(Some(ctx))
-
- ctx
- }
-
- def withContext[T](ctx: Option[TraceContextOld])(thunk: ⇒ T): T = {
- val oldval = context
- set(ctx)
-
- try thunk
- finally set(oldval)
- }
-
- def transformContext(f: TraceContextOld ⇒ TraceContextOld): Unit = {
- context.map(f).foreach(ctx ⇒ set(Some(ctx)))
- }
-
- def finish(): Option[TraceContextOld] = {
- val ctx = context()
- ctx.map(_.finish)
- clear
- ctx
- }
-
- // TODO: FIX
- def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContextOld = TraceContextOld(Kamon(TraceOld).api, tranid.getAndIncrement, name, token)
-
- def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = {
- val start = Segments.Start(category, description, attributes)
- SegmentCompletionHandle(start)
- }
-
- def startSegment(start: Segments.Start): SegmentCompletionHandle = SegmentCompletionHandle(start)
-
- case class SegmentCompletionHandle(start: Segments.Start) {
- def complete(): Unit = {
- val end = Segments.End()
- //println(s"Completing the Segment: $start - $end")
- }
- def complete(end: Segments.End): Unit = {
- //println(s"Completing the Segment: $start - $end")
- }
- }
-}
-
-object TraceToken {
- val tokenCounter = new AtomicLong
- val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def generate(): String = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
-}
-
-class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- val api: ActorRef = system.actorOf(Props[TraceManager], "kamon-trace")
-}
-
-class TraceManager extends Actor with ActorLogging {
- var listeners: Seq[ActorRef] = Seq.empty
-
- def receive = {
- case Register ⇒
- listeners = sender +: listeners
- log.info("Registered [{}] as listener for Kamon traces", sender)
-
- case segment: UowSegment ⇒
- val tracerName = segment.id.toString
- context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment
-
- case trace: UowTrace ⇒
- listeners foreach (_ ! trace)
- }
-
- def newTracer(name: String): ActorRef = {
- context.actorOf(UowTraceAggregator.props(self, 30 seconds), name)
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 95a3a8b2..d3759a26 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -1,36 +1,129 @@
-/* ===================================================
+/*
+ * =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================================================== */
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.trace
-import java.util.UUID
-import akka.actor._
-import java.util.concurrent.atomic.AtomicLong
-import scala.concurrent.duration._
+import akka.actor.ActorSystem
import kamon.Kamon
-import kamon.trace.UowTracing.{ Finish, Start }
+import kamon.metrics.{ MetricGroupRecorder, MetricIdentity, TraceMetrics, Metrics }
+import java.util.concurrent.ConcurrentLinkedQueue
+import kamon.trace.TraceContextAware.DefaultTraceContextAware
+import kamon.trace.TraceContext.SegmentIdentity
+
+trait TraceContext {
+ def name: String
+ def token: String
+ def system: ActorSystem
+ def rename(name: String): Unit
+ def levelOfDetail: TracingLevelOfDetail
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
+ def finish(metadata: Map[String, String])
+}
+
+object TraceContext {
+ type SegmentIdentity = MetricIdentity
+}
+
+trait SegmentCompletionHandle {
+ def finish(metadata: Map[String, String])
+}
+
+case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
-// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
-case class TraceContextOld(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) {
+sealed trait TracingLevelOfDetail
+case object OnlyMetrics extends TracingLevelOfDetail
+case object SimpleTrace extends TracingLevelOfDetail
+case object FullTrace extends TracingLevelOfDetail
- def start(name: String) = {
- collector ! Start(id, name)
+trait TraceContextAware {
+ def captureMark: Long
+ def traceContext: Option[TraceContext]
+}
+
+object TraceContextAware {
+ def default: TraceContextAware = new DefaultTraceContextAware
+
+ class DefaultTraceContextAware extends TraceContextAware {
+ val captureMark = System.nanoTime()
+ val traceContext = TraceRecorder.currentContext
}
+}
- def finish: Unit = {
- collector ! Finish(id)
+trait SegmentCompletionHandleAware extends TraceContextAware {
+ @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
+}
+
+object SegmentCompletionHandleAware {
+ def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware
+
+ class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
+}
+
+class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
+ val system: ActorSystem) extends TraceContext {
+ @volatile private var _isOpen = true
+ val levelOfDetail = OnlyMetrics
+ val startMark = System.nanoTime()
+ val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
+ val metricsExtension = Kamon(Metrics)(system)
+
+ def name: String = _name
+
+ def rename(newName: String): Unit = _name = newName
+
+ def isOpen(): Boolean = _isOpen
+
+ def finish(metadata: Map[String, String]): Unit = {
+ _isOpen = false
+ val finishMark = System.nanoTime()
+ val metricRecorder = metricsExtension.register(name, TraceMetrics)
+
+ metricRecorder.map { traceMetrics ⇒
+ traceMetrics.elapsedTime.record(finishMark - startMark)
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+
+ private def drainFinishedSegments(metricRecorder: MetricGroupRecorder): Unit = {
+ while (!finishedSegments.isEmpty) {
+ val segmentData = finishedSegments.poll()
+ metricRecorder.record(segmentData.identity, segmentData.duration)
+ }
}
+ private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
+ finishedSegments.add(SegmentData(identity, duration, metadata))
+
+ if (!_isOpen) {
+ metricsExtension.register(name, TraceMetrics).map { traceMetrics ⇒
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+ }
+
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
+ new SimpleMetricCollectionCompletionHandle(identity, metadata)
+
+ class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
+ val segmentStartMark = System.nanoTime()
+
+ def finish(metadata: Map[String, String] = Map.empty): Unit = {
+ val segmentFinishMark = System.nanoTime()
+ finishSegment(identity, (segmentFinishMark - segmentStartMark), startMetadata ++ metadata)
+ }
+ }
}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala b/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala
deleted file mode 100644
index 1e552563..00000000
--- a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import akka.actor.{ExtendedActorSystem, ActorSystem}
-import akka.dispatch.AbstractNodeQueue
-import kamon.Kamon
-import kamon.metrics.{TraceMetrics, Metrics}
-import java.util.concurrent.atomic.AtomicLong
-
-sealed trait TracingLevelOfDetail
-case object OnlyMetrics extends TracingLevelOfDetail
-case object SimpleTrace extends TracingLevelOfDetail
-case object FullTrace extends TracingLevelOfDetail
-
-trait TraceContext {
- def token: String
- def name: String
- def rename(newName: String): Unit
- def levelOfDetail: TracingLevelOfDetail
- def startSegment
- def finish(metadata: Map[String, String])
-
-}
-
-trait TraceContextAware {
- def captureMark: Long
- def traceContext: Option[TraceContext]
-}
-
-object TraceContextAware {
- def default: TraceContextAware = new TraceContextAware {
- val captureMark = System.nanoTime()
- val traceContext = TraceRecorder.currentContext
- }
-}
-
-object TraceContext {
-
-}
-
-class SimpleMetricCollectionContext(val token: String, @volatile private var _name: String, val system: ActorSystem,
- metadata: Map[String, String]) extends TraceContext {
- val levelOfDetail = OnlyMetrics
-
- @volatile private var _isOpen = true
-
- val startMark = System.nanoTime()
-
- def name: String = _name
-
- def rename(newName: String): Unit = _name = newName
-
- def isOpen(): Boolean = _isOpen
-
- def finish(metadata: Map[String, String]): Unit = {
- val finishMark = System.nanoTime()
-
- // Store all metrics!
- val metricsExtension = Kamon(Metrics)(system)
- val metricRecorder = metricsExtension.register(name, TraceMetrics)
-
- metricRecorder.map { traceMetrics =>
- traceMetrics.elapsedTime.record(finishMark - startMark)
- }
-
- }
-
- override def startSegment: Unit = ???
-
-
-}
-
-private[kamon] class SegmentRecordingQueue extends AbstractNodeQueue[String]
-
-
-
-
-class TraceRecorder(system: ExtendedActorSystem) {
-
-}
-
-object TraceRecorder {
- private val tokenCounter = new AtomicLong
- private val traceContextStorage = new ThreadLocal[Option[TraceContext]] {
- override def initialValue(): Option[TraceContext] = None
- }
-
- private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], system: ActorSystem): TraceContext = ???
-
- def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)
-
- def clearContext: Unit = traceContextStorage.set(None)
-
- def currentContext: Option[TraceContext] = traceContextStorage.get()
-
- def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = {
- val ctx = newTraceContext(name, token, metadata, system)
- traceContextStorage.set(Some(ctx))
- }
-
- def withContext[T](context: Option[TraceContext])(thunk: => T): T = {
- val oldContext = currentContext
- setContext(context)
-
- try thunk finally setContext(oldContext)
- }
-
- def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata))
-
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
new file mode 100644
index 00000000..3e3bb19f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -0,0 +1,69 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.util.concurrent.atomic.AtomicLong
+import scala.util.Try
+import java.net.InetAddress
+import akka.actor.ActorSystem
+import kamon.trace.TraceContext.SegmentIdentity
+
+object TraceRecorder {
+ private val traceContextStorage = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue(): Option[TraceContext] = None
+ }
+
+ private val tokenCounter = new AtomicLong
+ private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
+
+ def newToken = "%s-%s".format(hostnamePrefix, tokenCounter.incrementAndGet())
+
+ private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String],
+ system: ActorSystem): TraceContext = {
+
+ // In the future this should select between implementations.
+ val finalToken = token.getOrElse(newToken)
+ new SimpleMetricCollectionContext(name, finalToken, metadata, system)
+ }
+
+ def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)
+
+ def clearContext: Unit = traceContextStorage.set(None)
+
+ def currentContext: Option[TraceContext] = traceContextStorage.get()
+
+ def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = {
+ val ctx = newTraceContext(name, token, metadata, system)
+ traceContextStorage.set(Some(ctx))
+ }
+
+ def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): Option[SegmentCompletionHandle] =
+ currentContext.map(_.startSegment(identity, metadata))
+
+ def withNewTraceContext[T](name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(thunk: ⇒ T)(implicit system: ActorSystem): T =
+ withTraceContext(Some(newTraceContext(name, token, metadata, system)))(thunk)
+
+ def withTraceContext[T](context: Option[TraceContext])(thunk: ⇒ T): T = {
+ val oldContext = currentContext
+ setContext(context)
+
+ try thunk finally setContext(oldContext)
+ }
+
+ def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata))
+
+}
diff --git a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
index c681e921..a050145a 100644
--- a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
+++ b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala
@@ -46,7 +46,7 @@ class TestProbeTracing {
case _ ⇒ None
}
- TraceRecorder.withContext(traceContext) {
+ TraceRecorder.withTraceContext(traceContext) {
pjp.proceed
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
index d6648cef..a0d8e933 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorLoggingSpec.scala
@@ -18,24 +18,24 @@ package kamon.trace.instrumentation
import akka.testkit.TestKit
import org.scalatest.{ Inspectors, Matchers, WordSpecLike }
import akka.actor.{ Props, ActorLogging, Actor, ActorSystem }
-import akka.event.Logging.{ LogEvent }
-import kamon.trace.{ ContextAware, TraceContext, Trace }
+import akka.event.Logging.LogEvent
+import kamon.trace.{TraceContextAware, TraceRecorder}
class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with WordSpecLike with Matchers with Inspectors {
"the ActorLogging instrumentation" should {
"attach the TraceContext (if available) to log events" in {
- val testTraceContext = Some(TraceContext(Actor.noSender, 1, "test", "test-1"))
val loggerActor = system.actorOf(Props[LoggerActor])
system.eventStream.subscribe(testActor, classOf[LogEvent])
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("logging") {
loggerActor ! "info"
+ TraceRecorder.currentContext
}
fishForMessage() {
case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒
- val ctxInEvent = event.asInstanceOf[ContextAware].traceContext
+ val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext
ctxInEvent === testTraceContext
case event: LogEvent ⇒ false
@@ -46,6 +46,6 @@ class ActorLoggingSpec extends TestKit(ActorSystem("actor-logging-spec")) with W
class LoggerActor extends Actor with ActorLogging {
def receive = {
- case "info" ⇒ log.info("TraceContext => {}", Trace.context())
+ case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext)
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
index f32623b9..acc939fb 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorMessagePassingTracingSpec.scala
@@ -15,70 +15,71 @@
* ========================================================== */
package kamon.trace.instrumentation
-import org.scalatest.{ WordSpecLike, Matchers }
-import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
+import org.scalatest.WordSpecLike
+import akka.actor.{ Actor, Props, ActorSystem }
import akka.testkit.{ ImplicitSender, TestKit }
-import kamon.trace.Trace
+import kamon.trace.TraceRecorder
import akka.pattern.{ pipe, ask }
import akka.util.Timeout
import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
import akka.routing.RoundRobinRouter
-import kamon.trace.TraceContext
class ActorMessagePassingTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with ImplicitSender {
implicit val executionContext = system.dispatcher
"the message passing instrumentation" should {
- "propagate the TraceContext using bang" in new TraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext using bang" in new EchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") {
ctxEchoActor ! "test"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext using tell" in new TraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext using tell" in new EchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") {
ctxEchoActor.tell("test", testActor)
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext using ask" in new TraceContextEchoFixture {
+ "propagate the TraceContext using ask" in new EchoActorFixture {
implicit val timeout = Timeout(1 seconds)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") {
// The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it.
(ctxEchoActor ? "test") pipeTo (testActor)
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "propagate the TraceContext to actors behind a router" in new RoutedTraceContextEchoFixture {
- Trace.withContext(testTraceContext) {
+ "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture {
+ val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") {
ctxEchoActor ! "test"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
}
- trait TraceContextEchoFixture {
- val testTraceContext = Some(Trace.newTraceContext("test", "test-1"))
+ trait EchoActorFixture {
val ctxEchoActor = system.actorOf(Props[TraceContextEcho])
}
- trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
+ trait RoutedEchoActorFixture extends EchoActorFixture {
override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 1)))
}
}
class TraceContextEcho extends Actor {
def receive = {
- case msg: String ⇒ sender ! Trace.context()
+ case msg: String ⇒ sender ! TraceRecorder.currentContext
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
index 7d539370..00ecae79 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/ActorSystemMessagePassingInstrumentationSpec.scala
@@ -3,7 +3,7 @@ package kamon.trace.instrumentation
import akka.testkit.{ ImplicitSender, TestKit }
import akka.actor._
import org.scalatest.WordSpecLike
-import kamon.trace.Trace
+import kamon.trace.TraceRecorder
import scala.util.control.NonFatal
import akka.actor.SupervisorStrategy.{ Escalate, Stop, Restart, Resume }
import scala.concurrent.duration._
@@ -12,43 +12,44 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
implicit val executionContext = system.dispatcher
"the system message passing instrumentation" should {
- "keep the TraceContext while processing the Create message in top level actors" in new TraceContextFixture {
- Trace.withContext(testTraceContext) {
+ "keep the TraceContext while processing the Create message in top level actors" in {
+ val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") {
system.actorOf(Props(new Actor {
-
- testActor ! Trace.context()
-
+ testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
+
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
- "keep the TraceContext while processing the Create message in non top level actors" in new TraceContextFixture {
- Trace.withContext(testTraceContext) {
+ "keep the TraceContext while processing the Create message in non top level actors" in {
+ val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") {
system.actorOf(Props(new Actor {
def receive: Actor.Receive = {
case any ⇒
context.actorOf(Props(new Actor {
-
- testActor ! Trace.context()
-
+ testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
}
})) ! "any"
+
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
"keep the TraceContext in the supervision cycle" when {
- "the actor is resumed" in new TraceContextFixture {
+ "the actor is resumed" in {
val supervisor = supervisorWithDirective(Resume)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -58,11 +59,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectMsg(None)
}
- "the actor is restarted" in new TraceContextFixture {
+ "the actor is restarted" in {
val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -74,11 +76,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectMsg(None)
}
- "the actor is stopped" in new TraceContextFixture {
+ "the actor is stopped" in {
val supervisor = supervisorWithDirective(Stop, sendPostStop = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -86,11 +89,12 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
expectNoMsg(1 second)
}
- "the failure is escalated" in new TraceContextFixture {
+ "the failure is escalated" in {
val supervisor = supervisorWithDirective(Escalate, sendPostStop = true)
- Trace.withContext(testTraceContext) {
+ val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") {
supervisor ! "fail"
+ TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -108,7 +112,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
val child = context.actorOf(Props(new Parent))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! Trace.context(); Stop
+ case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop
}
def receive = {
@@ -120,7 +124,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
val child = context.actorOf(Props(new Child))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! Trace.context(); directive
+ case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive
}
def receive: Actor.Receive = {
@@ -128,7 +132,7 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! Trace.context()
+ if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
}
@@ -136,26 +140,26 @@ class ActorSystemMessagePassingInstrumentationSpec extends TestKit(ActorSystem("
class Child extends Actor {
def receive = {
case "fail" ⇒ 1 / 0
- case "context" ⇒ sender ! Trace.context()
+ case "context" ⇒ sender ! TraceRecorder.currentContext
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
- if (sendPreRestart) testActor ! Trace.context()
+ if (sendPreRestart) testActor ! TraceRecorder.currentContext
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
- if (sendPostRestart) testActor ! Trace.context()
+ if (sendPostRestart) testActor ! TraceRecorder.currentContext
super.postRestart(reason)
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! Trace.context()
+ if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
override def preStart(): Unit = {
- if (sendPreStart) testActor ! Trace.context()
+ if (sendPreStart) testActor ! TraceRecorder.currentContext
super.preStart()
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
index 9df67391..0387386c 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/AskPatternTracingSpec.scala
@@ -22,32 +22,30 @@ import akka.event.Logging.Warning
import scala.concurrent.duration._
import akka.pattern.ask
import akka.util.Timeout
-import kamon.trace.{ Trace, ContextAware }
+import kamon.trace.{TraceContextAware, TraceRecorder}
import org.scalatest.OptionValues._
class AskPatternTracingSpec extends TestKit(ActorSystem("ask-pattern-tracing-spec")) with WordSpecLike with Matchers {
"the AskPatternTracing" should {
- "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in new TraceContextFixture {
+ "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in {
implicit val ec = system.dispatcher
implicit val timeout = Timeout(10 milliseconds)
val noReply = system.actorOf(Props[NoReply])
system.eventStream.subscribe(testActor, classOf[Warning])
- within(500 milliseconds) {
- val initialCtx = Trace.withContext(testTraceContext) {
- noReply ? "hello"
- Trace.context()
- }
-
- val warn = expectMsgPF() {
- case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn
- }
- val capturedCtx = warn.asInstanceOf[ContextAware].traceContext
+ val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") {
+ noReply ? "hello"
+ TraceRecorder.currentContext
+ }
- capturedCtx should be('defined)
- capturedCtx should equal(initialCtx)
+ val warn = expectMsgPF() {
+ case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn
}
+ val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext
+
+ capturedCtx should be('defined)
+ capturedCtx should equal(testTraceContext)
}
}
}
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
index a5554836..e6797148 100644
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/instrumentation/FutureTracingSpec.scala
@@ -15,42 +15,42 @@
* ========================================================== */
package kamon.trace.instrumentation
-import scala.concurrent.{ ExecutionContext, Await, Promise, Future }
-import org.scalatest.{ Matchers, OptionValues, WordSpec }
+import scala.concurrent.{ ExecutionContext, Future }
+import org.scalatest.{ Matchers, OptionValues, WordSpecLike }
import org.scalatest.concurrent.{ ScalaFutures, PatienceConfiguration }
-import java.util.UUID
-import scala.util.{ Random, Success }
-import scala.concurrent.duration._
-import java.util.concurrent.TimeUnit
-import akka.actor.{ Actor, ActorSystem }
-import kamon.trace.{ Trace, TraceContext }
+import kamon.trace.TraceRecorder
+import akka.testkit.TestKit
+import akka.actor.ActorSystem
-class FutureTracingSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
+class FutureTracingSpec extends TestKit(ActorSystem("actor-message-passing-tracing-spec")) with WordSpecLike with Matchers
+ with ScalaFutures with PatienceConfiguration with OptionValues {
- implicit val execContext = ExecutionContext.Implicits.global
+ implicit val execContext = system.dispatcher
"a Future created with FutureTracing" should {
"capture the TraceContext available when created" which {
- "must be available when executing the future's body" in new TraceContextFixture {
- var future: Future[Option[TraceContext]] = _
+ "must be available when executing the future's body" in {
- Trace.withContext(testTraceContext) {
- future = Future(Trace.context)
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future(TraceRecorder.currentContext)
+
+ (future, TraceRecorder.currentContext)
}
whenReady(future)(ctxInFuture ⇒
ctxInFuture should equal(testTraceContext))
}
- "must be available when executing callbacks on the future" in new TraceContextFixture {
- var future: Future[Option[TraceContext]] = _
+ "must be available when executing callbacks on the future" in {
- Trace.withContext(testTraceContext) {
- future = Future("Hello Kamon!")
+ val (future, testTraceContext) = TraceRecorder.withNewTraceContext("future-body") {
+ val future = Future("Hello Kamon!")
// The TraceContext is expected to be available during all intermediate processing.
.map(_.length)
.flatMap(len ⇒ Future(len.toString))
- .map(s ⇒ Trace.context())
+ .map(s ⇒ TraceRecorder.currentContext)
+
+ (future, TraceRecorder.currentContext)
}
whenReady(future)(ctxInFuture ⇒
diff --git a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala b/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
deleted file mode 100644
index 2df95d09..00000000
--- a/kamon-core/src/test/scala/kamon/trace/instrumentation/TraceContextFixture.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package kamon.trace.instrumentation
-
-import scala.util.Random
-import kamon.trace.TraceContext
-import akka.actor.Actor
-
-trait TraceContextFixture {
- val random = new Random(System.nanoTime)
- val testTraceContext = Some(TraceContext(Actor.noSender, random.nextInt, "test", "test-1"))
-} \ No newline at end of file
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
index 668e29f7..5f736a90 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
@@ -18,7 +18,7 @@ package kamon.newrelic
import akka.actor._
import scala.collection.mutable
import kamon.Kamon
-import kamon.trace.{ UowTrace, Trace }
+import kamon.trace.{ UowTrace }
import kamon.newrelic.NewRelicMetric.{ MetricBatch, FlushMetrics }
import scala.concurrent.duration._
@@ -33,7 +33,7 @@ class NewRelicExtension(system: ExtendedActorSystem) extends Kamon.Extension {
class NewRelicManager extends Actor with ActorLogging {
log.info("Registering the Kamon(NewRelic) extension")
- Kamon(Trace)(context.system).api ! Trace.Register
+ //Kamon(Trace)(context.system).api ! Trace.Register
val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics")
val agent = context.actorOf(Props[Agent], "agent")
diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
index f3a08755..9f458bb5 100644
--- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
@@ -19,7 +19,7 @@ import akka.actor.Actor
import akka.event.Logging.Error
import akka.event.Logging.{ LoggerInitialized, InitializeLogger }
import com.newrelic.api.agent.NewRelic
-import kamon.trace.ContextAware
+import kamon.trace.TraceContextAware
class NewRelicErrorLogger extends Actor {
def receive = {
@@ -30,7 +30,7 @@ class NewRelicErrorLogger extends Actor {
def notifyError(error: Error): Unit = {
val params = new java.util.HashMap[String, String]()
- val ctx = error.asInstanceOf[ContextAware].traceContext
+ val ctx = error.asInstanceOf[TraceContextAware].traceContext
for (c ← ctx) {
params.put("TraceToken", c.token)
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf
index c0793fea..fb65721b 100644
--- a/kamon-playground/src/main/resources/application.conf
+++ b/kamon-playground/src/main/resources/application.conf
@@ -1,6 +1,6 @@
akka {
loggers = [ "akka.event.slf4j.Slf4jLogger" ]
- loglevel = INFO
+ loglevel = DEBUG
extensions = ["kamon.newrelic.NewRelic"]
@@ -11,6 +11,12 @@ akka {
}
}
+spray.can {
+ host-connector {
+ max-redirects = 10
+ }
+}
+
kamon {
newrelic {
app-name = "SimpleRequestProcessor"
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 868a1921..fd0f3e4b 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -21,10 +21,12 @@ import akka.util.Timeout
import spray.httpx.RequestBuilding
import scala.concurrent.{ Await, Future }
import kamon.spray.UowDirectives
-import kamon.trace.Trace
-import kamon.Kamon
import scala.util.Random
import akka.routing.RoundRobinRouter
+import kamon.trace.TraceRecorder
+import kamon.Kamon
+import kamon.metrics.{ ActorMetrics, TraceMetrics, Metrics }
+import spray.http.{ StatusCodes, Uri }
object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives {
import scala.concurrent.duration._
@@ -34,11 +36,13 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
implicit val system = ActorSystem("test")
import system.dispatcher
+ val printer = system.actorOf(Props[PrintWhatever])
+
val act = system.actorOf(Props(new Actor {
def receive: Actor.Receive = { case any ⇒ sender ! any }
}), "com")
- Thread.sleep(10000)
+ //Kamon(Metrics).subscribe(TraceMetrics, "*", printer, true)
implicit val timeout = Timeout(30 seconds)
@@ -55,19 +59,19 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
Future.sequence(futures).map(l ⇒ "Ok")
}
}
- } ~ {
+ } ~
path("site") {
complete {
- pipeline(Get("http://localhost:4000/"))
+ pipeline(Get("http://localhost:9090/site-redirect"))
}
- }
- } ~
+ } ~
+ path("site-redirect") {
+ redirect(Uri("http://localhost:4000/"), StatusCodes.MovedPermanently)
+
+ } ~
path("reply" / Segment) { reqID ⇒
uow {
complete {
- if (Trace.context().isEmpty)
- println("ROUTE NO CONTEXT")
-
(replier ? reqID).mapTo[String]
}
}
@@ -100,6 +104,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
}
+class PrintWhatever extends Actor {
+ def receive = {
+ case anything ⇒ println(anything)
+ }
+}
+
object Verifier extends App {
def go: Unit = {
@@ -124,7 +134,7 @@ object Verifier extends App {
class Replier extends Actor with ActorLogging {
def receive = {
case anything ⇒
- if (Trace.context.isEmpty)
+ if (TraceRecorder.currentContext.isEmpty)
log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
log.info("Processing at the Replier, and self is: {}", self)
diff --git a/kamon-spray/src/main/scala/kamon/spray/Spray.scala b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
new file mode 100644
index 00000000..4dc98d85
--- /dev/null
+++ b/kamon-spray/src/main/scala/kamon/spray/Spray.scala
@@ -0,0 +1,37 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.spray
+
+import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor
+import kamon.Kamon
+import spray.http.HttpRequest
+
+object Spray extends ExtensionId[SprayExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = Spray
+ def createExtension(system: ExtendedActorSystem): SprayExtension = new SprayExtension(system)
+}
+
+class SprayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
+ private val config = system.settings.config.getConfig("kamon.spray")
+
+ val includeTraceToken: Boolean = config.getBoolean("include-trace-token-header")
+ val traceTokenHeaderName: String = config.getString("trace-token-header-name")
+
+ // Later we should expose a way for the user to customize this.
+ def assignHttpClientRequestName(request: HttpRequest): String = request.uri.authority.host.address
+}
diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala
index 78a5b336..56cc6d5e 100644
--- a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala
+++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala
@@ -20,7 +20,6 @@ import spray.routing._
import java.util.concurrent.atomic.AtomicLong
import scala.util.Try
import java.net.InetAddress
-import kamon.trace.Trace
import spray.http.HttpHeaders.RawHeader
trait UowDirectives extends BasicDirectives {
@@ -28,10 +27,10 @@ trait UowDirectives extends BasicDirectives {
val uowHeader = request.headers.find(_.name == "X-UOW")
val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow)
- Trace.transformContext(_.copy(token = generatedUow))
+ //Trace.transformContext(_.copy(token = generatedUow))
request
}
- def respondWithUow = mapHttpResponseHeaders(headers ⇒ Trace.context().map(ctx ⇒ RawHeader("X-UOW", ctx.token) :: headers).getOrElse(headers))
+ //def respondWithUow = mapHttpResponseHeaders(headers ⇒ Trace.context().map(ctx ⇒ RawHeader("X-UOW", ctx.token) :: headers).getOrElse(headers))
}
object UowDirectives {
diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
index a1505a63..c74019dd 100644
--- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
+++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestTracing.scala
@@ -20,58 +20,60 @@ import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import spray.http.{ HttpMessageEnd, HttpRequest }
import spray.http.HttpHeaders.Host
-import kamon.trace.{ TraceContext, Trace, Segments }
-import kamon.trace.Segments.{ ContextAndSegmentCompletionAware, HttpClientRequest }
-import kamon.trace.Trace.SegmentCompletionHandle
+import kamon.trace.{ TraceRecorder, SegmentCompletionHandleAware, TraceContextAware }
+import kamon.metrics.TraceMetrics.HttpClientRequest
+import kamon.Kamon
+import kamon.spray.Spray
@Aspect
class ClientRequestTracing {
@DeclareMixin("spray.can.client.HttpHostConnector.RequestContext")
- def mixin: ContextAndSegmentCompletionAware = new ContextAndSegmentCompletionAware {
- val traceContext: Option[TraceContext] = Trace.context()
- var completionHandle: Option[SegmentCompletionHandle] = None
- }
+ def mixin: SegmentCompletionHandleAware = SegmentCompletionHandleAware.default
@Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)")
- def requestContextCreation(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {}
+ def requestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = {}
@After("requestContextCreation(ctx, request)")
- def afterRequestContextCreation(ctx: ContextAndSegmentCompletionAware, request: HttpRequest): Unit = {
+ def afterRequestContextCreation(ctx: SegmentCompletionHandleAware, request: HttpRequest): Unit = {
// The RequestContext will be copied when a request needs to be retried but we are only interested in creating the
// completion handle the first time we create one.
// The read to ctx.completionHandle should take care of initializing the aspect timely.
- if (ctx.completionHandle.isEmpty) {
- val requestAttributes = Map[String, String](
- "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
- "path" -> request.uri.path.toString(),
- "method" -> request.method.toString())
- val completionHandle = Trace.startSegment(category = HttpClientRequest, attributes = requestAttributes)
- ctx.completionHandle = Some(completionHandle)
+ if (ctx.segmentCompletionHandle.isEmpty) {
+ TraceRecorder.currentContext.map { traceContext ⇒
+ val requestAttributes = Map[String, String](
+ "host" -> request.header[Host].map(_.value).getOrElse("unknown"),
+ "path" -> request.uri.path.toString(),
+ "method" -> request.method.toString())
+
+ val clientRequestName = Kamon(Spray)(traceContext.system).assignHttpClientRequestName(request)
+ val completionHandle = traceContext.startSegment(HttpClientRequest(clientRequestName), requestAttributes)
+ ctx.segmentCompletionHandle = Some(completionHandle)
+ }
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)")
- def copyingRequestContext(old: ContextAndSegmentCompletionAware): Unit = {}
+ def copyingRequestContext(old: SegmentCompletionHandleAware): Unit = {}
@Around("copyingRequestContext(old)")
- def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: ContextAndSegmentCompletionAware) = {
- Trace.withContext(old.traceContext) {
+ def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentCompletionHandleAware): Any = {
+ TraceRecorder.withTraceContext(old.traceContext) {
pjp.proceed()
}
}
@Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)")
- def dispatchToCommander(requestContext: ContextAndSegmentCompletionAware, message: Any): Unit = {}
+ def dispatchToCommander(requestContext: SegmentCompletionHandleAware, message: Any): Unit = {}
@Around("dispatchToCommander(requestContext, message)")
- def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: ContextAndSegmentCompletionAware, message: Any) = {
+ def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentCompletionHandleAware, message: Any) = {
requestContext.traceContext match {
case ctx @ Some(_) ⇒
- Trace.withContext(ctx) {
+ TraceRecorder.withTraceContext(ctx) {
if (message.isInstanceOf[HttpMessageEnd])
- requestContext.completionHandle.map(_.complete(Segments.End()))
+ requestContext.segmentCompletionHandle.map(_.finish(Map.empty))
pjp.proceed()
}
@@ -80,4 +82,4 @@ class ClientRequestTracing {
}
}
-} \ No newline at end of file
+}
diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
index f0e52d12..b7479d2b 100644
--- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
+++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala
@@ -16,37 +16,34 @@
package spray.can.server
import org.aspectj.lang.annotation._
-import kamon.trace.{ Trace, ContextAware }
-import spray.http.HttpRequest
+import kamon.trace.{ TraceRecorder, TraceContextAware }
import akka.actor.ActorSystem
-import akka.event.Logging.Warning
-import org.aspectj.lang.ProceedingJoinPoint
import spray.http.HttpRequest
import akka.event.Logging.Warning
import scala.Some
+import kamon.Kamon
+import kamon.spray.Spray
@Aspect
class ServerRequestTracing {
@DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest")
- def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default
+ def mixinContextAwareToOpenRequest: TraceContextAware = TraceContextAware.default
@Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)")
- def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {}
+ def openRequestInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {}
@After("openRequestInit(openRequest, request)")
- def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = {
+ def afterInit(openRequest: TraceContextAware, request: HttpRequest): Unit = {
val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system
- val config = system.settings.config.getConfig("kamon.spray")
-
- val token = if (config.getBoolean("include-trace-token-header")) {
- val traceTokenHeader = config.getString("trace-token-header-name")
- request.headers.find(_.name == traceTokenHeader).map(_.value)
- } else None
+ val sprayExtension = Kamon(Spray)(system)
val defaultTraceName: String = request.method.value + ": " + request.uri.path
+ val token = if (sprayExtension.includeTraceToken) {
+ request.headers.find(_.name == sprayExtension.traceTokenHeaderName).map(_.value)
+ } else None
- Trace.start(defaultTraceName, token)(system)
+ TraceRecorder.start(defaultTraceName, token)(system)
// Necessary to force initialization of traceContext when initiating the request.
openRequest.traceContext
@@ -57,26 +54,26 @@ class ServerRequestTracing {
@After("openNewRequest()")
def afterOpenNewRequest(): Unit = {
- Trace.clear
+ TraceRecorder.clearContext
}
@Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)")
- def openRequestCreation(openRequest: ContextAware): Unit = {}
+ def openRequestCreation(openRequest: TraceContextAware): Unit = {}
@After("openRequestCreation(openRequest)")
- def afterFinishingRequest(openRequest: ContextAware): Unit = {
+ def afterFinishingRequest(openRequest: TraceContextAware): Unit = {
val storedContext = openRequest.traceContext
- val incomingContext = Trace.finish()
+ val incomingContext = TraceRecorder.currentContext
+ TraceRecorder.finish()
for (original ← storedContext) {
incomingContext match {
- case Some(incoming) if original.id != incoming.id ⇒
+ case Some(incoming) if original.token != incoming.token ⇒
publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]")
case Some(_) ⇒ // nothing to do here.
case None ⇒
- original.finish
publishWarning(s"Trace context not present while closing the Trace: [$original]")
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala
index e204cd02..1ed3a787 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ClientRequestTracingSpec.scala
@@ -5,7 +5,6 @@ import akka.actor.ActorSystem
import org.scalatest.WordSpecLike
import spray.httpx.RequestBuilding
import spray.client.pipelining._
-import kamon.trace.{ UowTrace, Trace }
import scala.concurrent.Await
class ClientRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding with TestServer {
@@ -20,10 +19,10 @@ class ClientRequestTracingSpec extends TestKit(ActorSystem("server-request-traci
Get(s"http://127.0.0.1:$port/ok")
// We don't care about the response, just make sure we finish the Trace after the response has been received.
- } map (rsp ⇒ Trace.finish())*/
+ } map (rsp ⇒ Trace.finish())
val trace = expectMsgType[UowTrace]
- println(trace.segments)
+ println(trace.segments)*/
}
}
diff --git a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala
index 904047ed..d14ea6b1 100644
--- a/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala
+++ b/kamon-spray/src/test/scala/kamon/spray/ServerRequestTracingSpec.scala
@@ -24,7 +24,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import _root_.spray.client.pipelining._
import akka.util.Timeout
-import kamon.trace.{ UowTrace, Trace }
+import kamon.trace.{ UowTrace }
import kamon.Kamon
import org.scalatest.concurrent.{ PatienceConfiguration, ScalaFutures }
import spray.http.HttpHeaders.RawHeader