aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2016-03-14 23:25:47 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2016-03-14 23:25:47 +0100
commit9e52aad6b02da72ca28d52d0c94e2e8784e7aa65 (patch)
tree7a6e28476bc76348fe22885969f586df4d59c720
parent3566482d061248ff01882fa9647ae2d65677d5ed (diff)
downloadKamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.tar.gz
Kamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.tar.bz2
Kamon-9e52aad6b02da72ca28d52d0c94e2e8784e7aa65.zip
remove the context propagation configuration, we will always propagate.
-rw-r--r--kamon-akka/src/main/resources/reference.conf3
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala19
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala105
3 files changed, 50 insertions, 77 deletions
diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf
index a5056f0c..c765c72b 100644
--- a/kamon-akka/src/main/resources/reference.conf
+++ b/kamon-akka/src/main/resources/reference.conf
@@ -11,9 +11,6 @@ kamon {
# - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation.
# - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created.
ask-pattern-timeout-warning = off
-
- automatic-trace-context-propagation = always
-
}
actor-group {
diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
index 95bfc64e..bc846a1d 100644
--- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
@@ -23,7 +23,6 @@ object AkkaExtension {
private val akkaConfig = Kamon.config.getConfig("kamon.akka")
val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(akkaConfig)
- val traceContextPropagation = TraceContextPropagationSettings.fromConfig(akkaConfig)
}
sealed trait AskPatternTimeoutWarningSetting
@@ -39,20 +38,4 @@ object AskPatternTimeoutWarningSettings {
case "heavyweight" ⇒ Heavyweight
case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.")
}
-}
-
-sealed trait TraceContextPropagationSetting
-object TraceContextPropagationSettings {
- case object Off extends TraceContextPropagationSetting
- case object MonitoredActorsOnly extends TraceContextPropagationSetting
- case object Always extends TraceContextPropagationSetting
-
- def fromConfig(config: Config): TraceContextPropagationSetting =
- config.getString("automatic-trace-context-propagation") match {
- case "off" ⇒ Off
- case "monitored-actors-only" ⇒ MonitoredActorsOnly
- case "always" ⇒ Always
- case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.automatic-trace-context-propagation config.")
- }
-}
-
+} \ No newline at end of file
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
index a2b920a2..6bbefc6f 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
@@ -1,23 +1,19 @@
package akka.kamon.instrumentation
import akka.actor.{ Cell, ActorRef, ActorSystem }
-import akka.kamon.instrumentation.ActorMonitors.{ TrackedRouteeWithContextPropagation, TrackedRoutee, ContextPropagation, TrackedActor }
+import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor }
import kamon.Kamon
-import kamon.akka.TraceContextPropagationSettings.{ Always, MonitoredActorsOnly, Off }
-import kamon.akka.{ AkkaExtension, RouterMetrics, ActorMetrics }
+import kamon.akka.{ RouterMetrics, ActorMetrics }
import kamon.metric.Entity
import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
import kamon.util.RelativeNanoTimestamp
import org.aspectj.lang.ProceedingJoinPoint
trait ActorMonitor {
- def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(captureTimestamp, captureTraceContext)
- def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
- def processFailure(failure: Throwable): Unit = {}
- def cleanup(): Unit = {}
-
- protected def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.zero
- protected def captureTraceContext: TraceContext = EmptyTraceContext
+ def captureEnvelopeContext(): EnvelopeContext
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef
+ def processFailure(failure: Throwable): Unit
+ def cleanup(): Unit
}
object ActorMonitor {
@@ -38,45 +34,56 @@ object ActorMonitor {
def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)
- AkkaExtension.traceContextPropagation match {
- case Off if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics)
- case Off ⇒ ActorMonitors.NoOp
- case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
- case MonitoredActorsOnly ⇒ ActorMonitors.NoOp
- case Always if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
- case Always ⇒ ActorMonitors.ContextPropagationOnly
- }
+ if (cellInfo.isTracked)
+ new TrackedActor(cellInfo.entity, actorMetrics)
+ else ActorMonitors.ContextPropagationOnly
}
def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)
- AkkaExtension.traceContextPropagation match {
- case Off if cellInfo.isTracked ⇒ new TrackedRoutee(cellInfo.entity, routerMetrics)
- case Off ⇒ ActorMonitors.NoOp
- case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
- case MonitoredActorsOnly ⇒ ActorMonitors.NoOp
- case Always if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
- case Always ⇒ ActorMonitors.ContextPropagationOnly
- }
+ if (cellInfo.isTracked)
+ new TrackedRoutee(cellInfo.entity, routerMetrics)
+ else ActorMonitors.ContextPropagationOnly
}
}
object ActorMonitors {
- val NoOp = new ActorMonitor {}
- val ContextPropagationOnly = new ActorMonitor with ContextPropagation
+ val NoOp = new ActorMonitor {
+ override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext)
+ override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
+ override def processFailure(failure: Throwable): Unit = {}
+ override def cleanup(): Unit = {}
+ }
+
+ val ContextPropagationOnly = new ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
+
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
+ }
+
+ def processFailure(failure: Throwable): Unit = {}
+ def cleanup(): Unit = {}
+
+ }
class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
- override def captureEnvelopeContext(): EnvelopeContext = {
+ def captureEnvelopeContext(): EnvelopeContext = {
actorMetrics.mailboxSize.increment()
- super.captureEnvelopeContext()
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
}
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
- super.processMessage(pjp, envelopeContext)
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
@@ -89,18 +96,21 @@ object ActorMonitors {
}
}
- override def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
- override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
- override def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
+ def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
+ def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
}
class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
- super.processMessage(pjp, envelopeContext)
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
@@ -112,24 +122,7 @@ object ActorMonitors {
}
}
- override def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
- override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
- override def cleanup(): Unit = {}
+ def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
+ def cleanup(): Unit = {}
}
-
- trait ContextPropagation extends ActorMonitor {
- override protected def captureTraceContext: TraceContext = Tracer.currentContext
-
- override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
- Tracer.withContext(envelopeContext.context) {
- super.processMessage(pjp, envelopeContext)
- }
- }
- }
-
- class TrackedActorWithContextPropagation(entity: Entity, actorMetrics: ActorMetrics)
- extends TrackedActor(entity, actorMetrics) with ContextPropagation
-
- class TrackedRouteeWithContextPropagation(entity: Entity, routerMetrics: RouterMetrics)
- extends TrackedRoutee(entity, routerMetrics) with ContextPropagation
} \ No newline at end of file