aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2016-03-14 23:25:47 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2016-03-14 23:25:47 +0100
commit9e52aad6b02da72ca28d52d0c94e2e8784e7aa65 (patch)
tree7a6e28476bc76348fe22885969f586df4d59c720 /kamon-akka/src/main/scala/kamon/akka/instrumentation
parent3566482d061248ff01882fa9647ae2d65677d5ed (diff)
downloadKamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.tar.gz
Kamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.tar.bz2
Kamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.zip
remove the context propagation configuration, we will always propagate.
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala105
1 files changed, 49 insertions, 56 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
index a2b920a2..6bbefc6f 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
@@ -1,23 +1,19 @@
package akka.kamon.instrumentation
import akka.actor.{ Cell, ActorRef, ActorSystem }
-import akka.kamon.instrumentation.ActorMonitors.{ TrackedRouteeWithContextPropagation, TrackedRoutee, ContextPropagation, TrackedActor }
+import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor }
import kamon.Kamon
-import kamon.akka.TraceContextPropagationSettings.{ Always, MonitoredActorsOnly, Off }
-import kamon.akka.{ AkkaExtension, RouterMetrics, ActorMetrics }
+import kamon.akka.{ RouterMetrics, ActorMetrics }
import kamon.metric.Entity
import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
import kamon.util.RelativeNanoTimestamp
import org.aspectj.lang.ProceedingJoinPoint
trait ActorMonitor {
- def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(captureTimestamp, captureTraceContext)
- def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
- def processFailure(failure: Throwable): Unit = {}
- def cleanup(): Unit = {}
-
- protected def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.zero
- protected def captureTraceContext: TraceContext = EmptyTraceContext
+ def captureEnvelopeContext(): EnvelopeContext
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef
+ def processFailure(failure: Throwable): Unit
+ def cleanup(): Unit
}
object ActorMonitor {
@@ -38,45 +34,56 @@ object ActorMonitor {
def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)
- AkkaExtension.traceContextPropagation match {
- case Off if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics)
- case Off ⇒ ActorMonitors.NoOp
- case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
- case MonitoredActorsOnly ⇒ ActorMonitors.NoOp
- case Always if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
- case Always ⇒ ActorMonitors.ContextPropagationOnly
- }
+ if (cellInfo.isTracked)
+ new TrackedActor(cellInfo.entity, actorMetrics)
+ else ActorMonitors.ContextPropagationOnly
}
def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)
- AkkaExtension.traceContextPropagation match {
- case Off if cellInfo.isTracked ⇒ new TrackedRoutee(cellInfo.entity, routerMetrics)
- case Off ⇒ ActorMonitors.NoOp
- case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
- case MonitoredActorsOnly ⇒ ActorMonitors.NoOp
- case Always if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
- case Always ⇒ ActorMonitors.ContextPropagationOnly
- }
+ if (cellInfo.isTracked)
+ new TrackedRoutee(cellInfo.entity, routerMetrics)
+ else ActorMonitors.ContextPropagationOnly
}
}
object ActorMonitors {
- val NoOp = new ActorMonitor {}
- val ContextPropagationOnly = new ActorMonitor with ContextPropagation
+ val NoOp = new ActorMonitor {
+ override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext)
+ override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
+ override def processFailure(failure: Throwable): Unit = {}
+ override def cleanup(): Unit = {}
+ }
+
+ val ContextPropagationOnly = new ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
+
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
+ }
+
+ def processFailure(failure: Throwable): Unit = {}
+ def cleanup(): Unit = {}
+
+ }
class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
- override def captureEnvelopeContext(): EnvelopeContext = {
+ def captureEnvelopeContext(): EnvelopeContext = {
actorMetrics.mailboxSize.increment()
- super.captureEnvelopeContext()
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
}
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
- super.processMessage(pjp, envelopeContext)
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
@@ -89,18 +96,21 @@ object ActorMonitors {
}
}
- override def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
- override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
- override def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
+ def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
+ def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
}
class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
- super.processMessage(pjp, envelopeContext)
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
@@ -112,24 +122,7 @@ object ActorMonitors {
}
}
- override def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
- override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
- override def cleanup(): Unit = {}
+ def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
+ def cleanup(): Unit = {}
}
-
- trait ContextPropagation extends ActorMonitor {
- override protected def captureTraceContext: TraceContext = Tracer.currentContext
-
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
- Tracer.withContext(envelopeContext.context) {
- super.processMessage(pjp, envelopeContext)
- }
- }
- }
-
- class TrackedActorWithContextPropagation(entity: Entity, actorMetrics: ActorMetrics)
- extends TrackedActor(entity, actorMetrics) with ContextPropagation
-
- class TrackedRouteeWithContextPropagation(entity: Entity, routerMetrics: RouterMetrics)
- extends TrackedRoutee(entity, routerMetrics) with ContextPropagation
} \ No newline at end of file