diff options
Diffstat (limited to 'kamon-akka-remote')
-rw-r--r-- | kamon-akka-remote/src/main/resources/META-INF/aop.xml | 2 | ||||
-rw-r--r-- | kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala (renamed from kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala) | 30 | ||||
-rw-r--r-- | kamon-akka-remote/src/test/resources/logback.xml | 17 | ||||
-rw-r--r-- | kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala (renamed from kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala) | 26 |
4 files changed, 49 insertions, 26 deletions
diff --git a/kamon-akka-remote/src/main/resources/META-INF/aop.xml b/kamon-akka-remote/src/main/resources/META-INF/aop.xml index ba1c8e79..e84a6094 100644 --- a/kamon-akka-remote/src/main/resources/META-INF/aop.xml +++ b/kamon-akka-remote/src/main/resources/META-INF/aop.xml @@ -3,7 +3,7 @@ <aspectj> <aspects> <!-- Remoting and Cluster --> - <aspect name="akka.remote.instrumentation.RemotingInstrumentation"/> + <aspect name="akka.kamon.instrumentation.RemotingInstrumentation"/> </aspects> <weaver> diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala index 6bdee063..eb18ed87 100644 --- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala +++ b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala @@ -1,12 +1,12 @@ -package akka.remote.instrumentation +package akka.kamon.instrumentation import akka.actor.{ ActorRef, Address } import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } import akka.remote.WireFormats._ import akka.util.ByteString -import kamon.MilliTimestamp -import kamon.trace.TraceRecorder +import kamon.trace.{ Tracer, TraceContext } +import kamon.util.MilliTimestamp import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -32,15 +32,13 @@ class RemotingInstrumentation { envelopeBuilder.setMessage(serializedMessage) // Attach the TraceContext info, if available. - if (!TraceRecorder.currentContext.isEmpty) { - val context = TraceRecorder.currentContext - val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.startRelativeTimestamp.nanos) / 1000000) + TraceContext.map { context ⇒ envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() .setTraceName(context.name) .setTraceToken(context.token) .setIsOpen(context.isOpen) - .setStartMilliTime(relativeStartMilliTime) + .setStartMilliTime(context.startTimestamp.toMilliTimestamp.millis) .build()) } @@ -85,14 +83,16 @@ class RemotingInstrumentation { if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext val system = provider.guardian.underlying.system - val ctx = TraceRecorder.joinRemoteTraceContext( - remoteTraceContext.getTraceName(), - remoteTraceContext.getTraceToken(), - new MilliTimestamp(remoteTraceContext.getStartMilliTime()), - remoteTraceContext.getIsOpen(), - system) - - TraceRecorder.setContext(ctx) + val tracer = Tracer.get(system) + + val ctx = tracer.newContext( + remoteTraceContext.getTraceName, + remoteTraceContext.getTraceToken, + new MilliTimestamp(remoteTraceContext.getStartMilliTime()).toRelativeNanoTimestamp, + remoteTraceContext.getIsOpen, + isLocal = false) + + TraceContext.setCurrentContext(ctx) } pjp.proceed() diff --git a/kamon-akka-remote/src/test/resources/logback.xml b/kamon-akka-remote/src/test/resources/logback.xml new file mode 100644 index 00000000..dd623d61 --- /dev/null +++ b/kamon-akka-remote/src/test/resources/logback.xml @@ -0,0 +1,17 @@ +<configuration scan="true"> + <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> + <resetJUL>true</resetJUL> + </contextListener> + + <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern> + </encoder> + </appender> + + <root level="error"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala index 8a3973ca..367a7349 100644 --- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala +++ b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala @@ -8,7 +8,8 @@ import akka.routing.RoundRobinGroup import akka.testkit.{ ImplicitSender, TestKitBase } import akka.util.Timeout import com.typesafe.config.ConfigFactory -import kamon.trace.TraceRecorder +import kamon.Kamon +import kamon.trace.TraceContext import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.duration._ @@ -18,6 +19,8 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString( """ |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | | actor { | provider = "akka.remote.RemoteActorRefProvider" | } @@ -34,6 +37,8 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString( """ |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | | actor { | provider = "akka.remote.RemoteActorRefProvider" | } @@ -47,11 +52,13 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat |} """.stripMargin)) + lazy val kamon = Kamon(system) val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553") + import kamon.tracer.newContext "The Remoting instrumentation" should { "propagate the TraceContext when creating a new remote actor" in { - TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) { + TraceContext.withContext(newContext("deploy-remote-actor", "deploy-remote-actor-1")) { system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture") } @@ -61,7 +68,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat "propagate the TraceContext when sending a message to a remotely deployed actor" in { val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture") - TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) { + TraceContext.withContext(newContext("message-remote-actor", "message-remote-actor-1")) { remoteRef ! "reply-trace-token" } @@ -73,7 +80,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat implicit val askTimeout = Timeout(10 seconds) val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture") - TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) { + TraceContext.withContext(newContext("ask-and-pipe-remote-actor", "ask-and-pipe-remote-actor-1")) { (remoteRef ? "reply-trace-token") pipeTo (testActor) } @@ -85,7 +92,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b") val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*") - TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) { + TraceContext.withContext(newContext("message-remote-actor-selection", "message-remote-actor-selection-1")) { selection ! "reply-trace-token" } @@ -97,7 +104,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat "propagate the TraceContext a remotely supervised child fails" in { val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress))) - TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) { + TraceContext.withContext(newContext("remote-supervision", "remote-supervision-1")) { supervisor ! "fail" } @@ -108,7 +115,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee") val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router") - TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) { + TraceContext.withContext(newContext("remote-routee", "remote-routee-1")) { router ! "reply-trace-token" } @@ -127,12 +134,11 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends case "fail" ⇒ throw new ArithmeticException("Division by zero.") case "reply-trace-token" ⇒ - log.info("Sending back the TT: " + TraceRecorder.currentContext.token) sender ! currentTraceContextInfo } def currentTraceContextInfo: String = { - val ctx = TraceRecorder.currentContext + val ctx = TraceContext.currentContext s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } @@ -161,7 +167,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) } def currentTraceContextInfo: String = { - val ctx = TraceRecorder.currentContext + val ctx = TraceContext.currentContext s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" } } |