From 9e52aad6b02da72ca28d52d0c94e2e8784e7aa65 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 14 Mar 2016 23:25:47 +0100 Subject: remove the context propagation configuration, we will always propagate. --- .../src/main/scala/kamon/akka/AkkaExtension.scala | 19 +--- .../kamon/akka/instrumentation/ActorMonitor.scala | 105 ++++++++++----------- 2 files changed, 50 insertions(+), 74 deletions(-) (limited to 'kamon-akka/src/main/scala') diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index 95bfc64e..bc846a1d 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -23,7 +23,6 @@ object AkkaExtension { private val akkaConfig = Kamon.config.getConfig("kamon.akka") val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(akkaConfig) - val traceContextPropagation = TraceContextPropagationSettings.fromConfig(akkaConfig) } sealed trait AskPatternTimeoutWarningSetting @@ -39,20 +38,4 @@ object AskPatternTimeoutWarningSettings { case "heavyweight" ⇒ Heavyweight case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.") } -} - -sealed trait TraceContextPropagationSetting -object TraceContextPropagationSettings { - case object Off extends TraceContextPropagationSetting - case object MonitoredActorsOnly extends TraceContextPropagationSetting - case object Always extends TraceContextPropagationSetting - - def fromConfig(config: Config): TraceContextPropagationSetting = - config.getString("automatic-trace-context-propagation") match { - case "off" ⇒ Off - case "monitored-actors-only" ⇒ MonitoredActorsOnly - case "always" ⇒ Always - case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.automatic-trace-context-propagation config.") - } -} - +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala index a2b920a2..6bbefc6f 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala @@ -1,23 +1,19 @@ package akka.kamon.instrumentation import akka.actor.{ Cell, ActorRef, ActorSystem } -import akka.kamon.instrumentation.ActorMonitors.{ TrackedRouteeWithContextPropagation, TrackedRoutee, ContextPropagation, TrackedActor } +import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor } import kamon.Kamon -import kamon.akka.TraceContextPropagationSettings.{ Always, MonitoredActorsOnly, Off } -import kamon.akka.{ AkkaExtension, RouterMetrics, ActorMetrics } +import kamon.akka.{ RouterMetrics, ActorMetrics } import kamon.metric.Entity import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer } import kamon.util.RelativeNanoTimestamp import org.aspectj.lang.ProceedingJoinPoint trait ActorMonitor { - def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(captureTimestamp, captureTraceContext) - def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed() - def processFailure(failure: Throwable): Unit = {} - def cleanup(): Unit = {} - - protected def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.zero - protected def captureTraceContext: TraceContext = EmptyTraceContext + def captureEnvelopeContext(): EnvelopeContext + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef + def processFailure(failure: Throwable): Unit + def cleanup(): Unit } object ActorMonitor { @@ -38,45 +34,56 @@ object ActorMonitor { def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = { def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity) - AkkaExtension.traceContextPropagation match { - case Off if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) - case Off ⇒ ActorMonitors.NoOp - case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation - case MonitoredActorsOnly ⇒ ActorMonitors.NoOp - case Always if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation - case Always ⇒ ActorMonitors.ContextPropagationOnly - } + if (cellInfo.isTracked) + new TrackedActor(cellInfo.entity, actorMetrics) + else ActorMonitors.ContextPropagationOnly } def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = { def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity) - AkkaExtension.traceContextPropagation match { - case Off if cellInfo.isTracked ⇒ new TrackedRoutee(cellInfo.entity, routerMetrics) - case Off ⇒ ActorMonitors.NoOp - case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics) - case MonitoredActorsOnly ⇒ ActorMonitors.NoOp - case Always if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics) - case Always ⇒ ActorMonitors.ContextPropagationOnly - } + if (cellInfo.isTracked) + new TrackedRoutee(cellInfo.entity, routerMetrics) + else ActorMonitors.ContextPropagationOnly } } object ActorMonitors { - val NoOp = new ActorMonitor {} - val ContextPropagationOnly = new ActorMonitor with ContextPropagation + val NoOp = new ActorMonitor { + override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext) + override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed() + override def processFailure(failure: Throwable): Unit = {} + override def cleanup(): Unit = {} + } + + val ContextPropagationOnly = new ActorMonitor { + def captureEnvelopeContext(): EnvelopeContext = + EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext) + + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + Tracer.withContext(envelopeContext.context) { + pjp.proceed() + } + } + + def processFailure(failure: Throwable): Unit = {} + def cleanup(): Unit = {} + + } class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor { - override def captureEnvelopeContext(): EnvelopeContext = { + def captureEnvelopeContext(): EnvelopeContext = { actorMetrics.mailboxSize.increment() - super.captureEnvelopeContext() + EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext) } - override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { val timestampBeforeProcessing = RelativeNanoTimestamp.now try { - super.processMessage(pjp, envelopeContext) + Tracer.withContext(envelopeContext.context) { + pjp.proceed() + } } finally { val timestampAfterProcessing = RelativeNanoTimestamp.now @@ -89,18 +96,21 @@ object ActorMonitors { } } - override def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment() - override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now - override def cleanup(): Unit = Kamon.metrics.removeEntity(entity) + def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment() + def cleanup(): Unit = Kamon.metrics.removeEntity(entity) } class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor { + def captureEnvelopeContext(): EnvelopeContext = + EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext) - override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { val timestampBeforeProcessing = RelativeNanoTimestamp.now try { - super.processMessage(pjp, envelopeContext) + Tracer.withContext(envelopeContext.context) { + pjp.proceed() + } } finally { val timestampAfterProcessing = RelativeNanoTimestamp.now @@ -112,24 +122,7 @@ object ActorMonitors { } } - override def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment() - override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now - override def cleanup(): Unit = {} + def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment() + def cleanup(): Unit = {} } - - trait ContextPropagation extends ActorMonitor { - override protected def captureTraceContext: TraceContext = Tracer.currentContext - - override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { - Tracer.withContext(envelopeContext.context) { - super.processMessage(pjp, envelopeContext) - } - } - } - - class TrackedActorWithContextPropagation(entity: Entity, actorMetrics: ActorMetrics) - extends TrackedActor(entity, actorMetrics) with ContextPropagation - - class TrackedRouteeWithContextPropagation(entity: Entity, routerMetrics: RouterMetrics) - extends TrackedRoutee(entity, routerMetrics) with ContextPropagation } \ No newline at end of file -- cgit v1.2.3