From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../src/main/resources/META-INF/aop.xml | 2 +- .../instrumentation/RemotingInstrumentation.scala | 100 ++++++++++++ .../akka/RemotingInstrumentation.scala | 100 ------------ kamon-akka-remote/src/test/resources/logback.xml | 17 ++ .../RemotingInstrumentationSpec.scala | 173 +++++++++++++++++++++ .../akka/RemotingInstrumentationSpec.scala | 167 -------------------- 6 files changed, 291 insertions(+), 268 deletions(-) create mode 100644 kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala delete mode 100644 kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala create mode 100644 kamon-akka-remote/src/test/resources/logback.xml create mode 100644 kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala delete mode 100644 kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala (limited to 'kamon-akka-remote') diff --git a/kamon-akka-remote/src/main/resources/META-INF/aop.xml b/kamon-akka-remote/src/main/resources/META-INF/aop.xml index ba1c8e79..e84a6094 100644 --- a/kamon-akka-remote/src/main/resources/META-INF/aop.xml +++ b/kamon-akka-remote/src/main/resources/META-INF/aop.xml @@ -3,7 +3,7 @@ - + diff --git a/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala new file mode 100644 index 00000000..eb18ed87 --- /dev/null +++ b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala @@ -0,0 +1,100 @@ +package akka.kamon.instrumentation + +import akka.actor.{ ActorRef, Address } +import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } +import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } +import akka.remote.WireFormats._ +import akka.util.ByteString +import kamon.trace.{ Tracer, TraceContext } +import kamon.util.MilliTimestamp +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class RemotingInstrumentation { + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " + + "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage, + senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {} + + @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef, + serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = { + + val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder + val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder + + envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) + senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } + seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } + ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } + envelopeBuilder.setMessage(serializedMessage) + + // Attach the TraceContext info, if available. + TraceContext.map { context ⇒ + + envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() + .setTraceName(context.name) + .setTraceToken(context.token) + .setIsOpen(context.isOpen) + .setStartMilliTime(context.startTimestamp.toMilliTimestamp.millis) + .build()) + } + + ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) + ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { + val ackBuilder = AcknowledgementInfo.newBuilder() + ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue) + ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) } + ackBuilder + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = { + ActorRefData.newBuilder.setPath( + if (ref.path.address.host.isDefined) ref.path.toSerializationFormat + else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeAddress(address: Address): AddressData = address match { + case Address(protocol, system, Some(host), Some(port)) ⇒ + AddressData.newBuilder + .setHostname(host) + .setPort(port) + .setSystem(system) + .setProtocol(protocol) + .build() + case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") + } + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") + def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} + + @Around("decodeRemoteMessage(bs, provider, localAddress)") + def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = { + val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray) + + if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { + val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext + val system = provider.guardian.underlying.system + val tracer = Tracer.get(system) + + val ctx = tracer.newContext( + remoteTraceContext.getTraceName, + remoteTraceContext.getTraceToken, + new MilliTimestamp(remoteTraceContext.getStartMilliTime()).toRelativeNanoTimestamp, + remoteTraceContext.getIsOpen, + isLocal = false) + + TraceContext.setCurrentContext(ctx) + } + + pjp.proceed() + } +} diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala deleted file mode 100644 index 6bdee063..00000000 --- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala +++ /dev/null @@ -1,100 +0,0 @@ -package akka.remote.instrumentation - -import akka.actor.{ ActorRef, Address } -import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } -import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } -import akka.remote.WireFormats._ -import akka.util.ByteString -import kamon.MilliTimestamp -import kamon.trace.TraceRecorder -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class RemotingInstrumentation { - - @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " + - "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") - def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {} - - @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") - def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef, - serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = { - - val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder - val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder - - envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) - senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } - seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } - ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } - envelopeBuilder.setMessage(serializedMessage) - - // Attach the TraceContext info, if available. - if (!TraceRecorder.currentContext.isEmpty) { - val context = TraceRecorder.currentContext - val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.startRelativeTimestamp.nanos) / 1000000) - - envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() - .setTraceName(context.name) - .setTraceToken(context.token) - .setIsOpen(context.isOpen) - .setStartMilliTime(relativeStartMilliTime) - .build()) - } - - ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) - ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { - val ackBuilder = AcknowledgementInfo.newBuilder() - ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue) - ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) } - ackBuilder - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = { - ActorRefData.newBuilder.setPath( - if (ref.path.address.host.isDefined) ref.path.toSerializationFormat - else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def serializeAddress(address: Address): AddressData = address match { - case Address(protocol, system, Some(host), Some(port)) ⇒ - AddressData.newBuilder - .setHostname(host) - .setPort(port) - .setSystem(system) - .setProtocol(protocol) - .build() - case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") - } - - @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") - def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} - - @Around("decodeRemoteMessage(bs, provider, localAddress)") - def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = { - val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray) - - if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { - val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext - val system = provider.guardian.underlying.system - val ctx = TraceRecorder.joinRemoteTraceContext( - remoteTraceContext.getTraceName(), - remoteTraceContext.getTraceToken(), - new MilliTimestamp(remoteTraceContext.getStartMilliTime()), - remoteTraceContext.getIsOpen(), - system) - - TraceRecorder.setContext(ctx) - } - - pjp.proceed() - } -} diff --git a/kamon-akka-remote/src/test/resources/logback.xml b/kamon-akka-remote/src/test/resources/logback.xml new file mode 100644 index 00000000..dd623d61 --- /dev/null +++ b/kamon-akka-remote/src/test/resources/logback.xml @@ -0,0 +1,17 @@ + + + true + + + + + + + %date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n + + + + + + + diff --git a/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala new file mode 100644 index 00000000..367a7349 --- /dev/null +++ b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala @@ -0,0 +1,173 @@ +package kamon.instrumentation.akka + +import akka.actor.SupervisorStrategy.Resume +import akka.actor._ +import akka.pattern.{ ask, pipe } +import akka.remote.RemoteScope +import akka.routing.RoundRobinGroup +import akka.testkit.{ ImplicitSender, TestKitBase } +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.trace.TraceContext +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | remote { + | enabled-transports = ["akka.remote.netty.tcp"] + | netty.tcp { + | hostname = "127.0.0.1" + | port = 2552 + | } + | } + |} + """.stripMargin)) + + val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | remote { + | enabled-transports = ["akka.remote.netty.tcp"] + | netty.tcp { + | hostname = "127.0.0.1" + | port = 2553 + | } + | } + |} + """.stripMargin)) + + lazy val kamon = Kamon(system) + val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553") + import kamon.tracer.newContext + + "The Remoting instrumentation" should { + "propagate the TraceContext when creating a new remote actor" in { + TraceContext.withContext(newContext("deploy-remote-actor", "deploy-remote-actor-1")) { + system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture") + } + + expectMsg("name=deploy-remote-actor|token=deploy-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when sending a message to a remotely deployed actor" in { + val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture") + + TraceContext.withContext(newContext("message-remote-actor", "message-remote-actor-1")) { + remoteRef ! "reply-trace-token" + } + + expectMsg("name=message-remote-actor|token=message-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when pipe or ask a message to a remotely deployed actor" in { + implicit val ec = system.dispatcher + implicit val askTimeout = Timeout(10 seconds) + val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture") + + TraceContext.withContext(newContext("ask-and-pipe-remote-actor", "ask-and-pipe-remote-actor-1")) { + (remoteRef ? "reply-trace-token") pipeTo (testActor) + } + + expectMsg("name=ask-and-pipe-remote-actor|token=ask-and-pipe-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when sending a message to an ActorSelection" in { + remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-a") + remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b") + val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*") + + TraceContext.withContext(newContext("message-remote-actor-selection", "message-remote-actor-selection-1")) { + selection ! "reply-trace-token" + } + + // one for each selected actor + expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") + expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") + } + + "propagate the TraceContext a remotely supervised child fails" in { + val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress))) + + TraceContext.withContext(newContext("remote-supervision", "remote-supervision-1")) { + supervisor ! "fail" + } + + expectMsg("name=remote-supervision|token=remote-supervision-1|isOpen=true") + } + + "propagate the TraceContext when sending messages to remote routees of a router" in { + remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee") + val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router") + + TraceContext.withContext(newContext("remote-routee", "remote-routee-1")) { + router ! "reply-trace-token" + } + + expectMsg("name=remote-routee|token=remote-routee-1|isOpen=true") + } + } + +} + +class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends Actor with ActorLogging { + creationTraceContextListener map { recipient ⇒ + recipient ! currentTraceContextInfo + } + + def receive = { + case "fail" ⇒ + throw new ArithmeticException("Division by zero.") + case "reply-trace-token" ⇒ + sender ! currentTraceContextInfo + } + + def currentTraceContextInfo: String = { + val ctx = TraceContext.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" + } +} + +object TraceTokenReplier { + def props(creationTraceContextListener: Option[ActorRef]): Props = + Props(new TraceTokenReplier(creationTraceContextListener)) + + def remoteProps(creationTraceContextListener: Option[ActorRef], remoteAddress: Address): Props = { + Props(new TraceTokenReplier(creationTraceContextListener)) + .withDeploy(Deploy(scope = RemoteScope(remoteAddress))) + } +} + +class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) extends Actor { + val supervisedChild = context.actorOf(TraceTokenReplier.remoteProps(None, remoteAddress), "remotely-supervised-child") + + def receive = { + case "fail" ⇒ supervisedChild ! "fail" + } + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ + traceContextListener ! currentTraceContextInfo + Resume + } + + def currentTraceContextInfo: String = { + val ctx = TraceContext.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" + } +} diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala deleted file mode 100644 index 8a3973ca..00000000 --- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala +++ /dev/null @@ -1,167 +0,0 @@ -package kamon.instrumentation.akka - -import akka.actor.SupervisorStrategy.Resume -import akka.actor._ -import akka.pattern.{ ask, pipe } -import akka.remote.RemoteScope -import akka.routing.RoundRobinGroup -import akka.testkit.{ ImplicitSender, TestKitBase } -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.TraceRecorder -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString( - """ - |akka { - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | remote { - | enabled-transports = ["akka.remote.netty.tcp"] - | netty.tcp { - | hostname = "127.0.0.1" - | port = 2552 - | } - | } - |} - """.stripMargin)) - - val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString( - """ - |akka { - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | remote { - | enabled-transports = ["akka.remote.netty.tcp"] - | netty.tcp { - | hostname = "127.0.0.1" - | port = 2553 - | } - | } - |} - """.stripMargin)) - - val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553") - - "The Remoting instrumentation" should { - "propagate the TraceContext when creating a new remote actor" in { - TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) { - system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture") - } - - expectMsg("name=deploy-remote-actor|token=deploy-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when sending a message to a remotely deployed actor" in { - val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture") - - TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) { - remoteRef ! "reply-trace-token" - } - - expectMsg("name=message-remote-actor|token=message-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when pipe or ask a message to a remotely deployed actor" in { - implicit val ec = system.dispatcher - implicit val askTimeout = Timeout(10 seconds) - val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture") - - TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) { - (remoteRef ? "reply-trace-token") pipeTo (testActor) - } - - expectMsg("name=ask-and-pipe-remote-actor|token=ask-and-pipe-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when sending a message to an ActorSelection" in { - remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-a") - remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b") - val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*") - - TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) { - selection ! "reply-trace-token" - } - - // one for each selected actor - expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") - expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") - } - - "propagate the TraceContext a remotely supervised child fails" in { - val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress))) - - TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) { - supervisor ! "fail" - } - - expectMsg("name=remote-supervision|token=remote-supervision-1|isOpen=true") - } - - "propagate the TraceContext when sending messages to remote routees of a router" in { - remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee") - val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router") - - TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) { - router ! "reply-trace-token" - } - - expectMsg("name=remote-routee|token=remote-routee-1|isOpen=true") - } - } - -} - -class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends Actor with ActorLogging { - creationTraceContextListener map { recipient ⇒ - recipient ! currentTraceContextInfo - } - - def receive = { - case "fail" ⇒ - throw new ArithmeticException("Division by zero.") - case "reply-trace-token" ⇒ - log.info("Sending back the TT: " + TraceRecorder.currentContext.token) - sender ! currentTraceContextInfo - } - - def currentTraceContextInfo: String = { - val ctx = TraceRecorder.currentContext - s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" - } -} - -object TraceTokenReplier { - def props(creationTraceContextListener: Option[ActorRef]): Props = - Props(new TraceTokenReplier(creationTraceContextListener)) - - def remoteProps(creationTraceContextListener: Option[ActorRef], remoteAddress: Address): Props = { - Props(new TraceTokenReplier(creationTraceContextListener)) - .withDeploy(Deploy(scope = RemoteScope(remoteAddress))) - } -} - -class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) extends Actor { - val supervisedChild = context.actorOf(TraceTokenReplier.remoteProps(None, remoteAddress), "remotely-supervised-child") - - def receive = { - case "fail" ⇒ supervisedChild ! "fail" - } - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ - traceContextListener ! currentTraceContextInfo - Resume - } - - def currentTraceContextInfo: String = { - val ctx = TraceRecorder.currentContext - s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" - } -} -- cgit v1.2.3