aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala')
-rw-r--r--kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala169
1 files changed, 169 insertions, 0 deletions
diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
new file mode 100644
index 00000000..29276dd0
--- /dev/null
+++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
@@ -0,0 +1,169 @@
+package kamon.instrumentation.akka
+
+import akka.actor.SupervisorStrategy.Resume
+import akka.actor._
+import akka.pattern.{ ask, pipe }
+import akka.remote.RemoteScope
+import akka.routing.RoundRobinGroup
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import kamon.trace.TraceRecorder
+import org.scalatest.{ Matchers, WordSpecLike }
+
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString(
+ """
+ |akka {
+ | actor {
+ | provider = "akka.remote.RemoteActorRefProvider"
+ | }
+ | remote {
+ | enabled-transports = ["akka.remote.netty.tcp"]
+ | netty.tcp {
+ | hostname = "127.0.0.1"
+ | port = 2552
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString(
+ """
+ |akka {
+ | actor {
+ | provider = "akka.remote.RemoteActorRefProvider"
+ | }
+ | remote {
+ | enabled-transports = ["akka.remote.netty.tcp"]
+ | netty.tcp {
+ | hostname = "127.0.0.1"
+ | port = 2553
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553")
+
+ "The Remoting instrumentation" should {
+ "propagate the TraceContext when creating a new remote actor" in {
+ TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) {
+ system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture")
+ }
+
+ expectMsg("name=deploy-remote-actor|token=deploy-remote-actor-1|isOpen=true")
+ }
+
+ "propagate the TraceContext when sending a message to a remotely deployed actor" in {
+ val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture")
+
+ TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) {
+ remoteRef ! "reply-trace-token"
+ }
+
+ expectMsg("name=message-remote-actor|token=message-remote-actor-1|isOpen=true")
+ }
+
+ "propagate the TraceContext when pipe or ask a message to a remotely deployed actor" in {
+ implicit val ec = system.dispatcher
+ implicit val askTimeout = Timeout(10 seconds)
+ val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture")
+
+ TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) {
+ (remoteRef ? "reply-trace-token") pipeTo (testActor)
+ }
+
+ expectMsg("name=ask-and-pipe-remote-actor|token=ask-and-pipe-remote-actor-1|isOpen=true")
+ }
+
+ "propagate the TraceContext when sending a message to an ActorSelection" in {
+ remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-a")
+ remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b")
+ val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*")
+
+ TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) {
+ selection ! "reply-trace-token"
+ }
+
+ // one for each selected actor
+ expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true")
+ expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true")
+ }
+
+ "propagate the TraceContext a remotely supervised child fails" in {
+ val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress)))
+
+ TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) {
+ supervisor ! "fail"
+ }
+
+ expectMsg("name=remote-supervision|token=remote-supervision-1|isOpen=true")
+ }
+
+ "propagate the TraceContext when sending messages to remote routees of a router" in {
+ remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee")
+ val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router")
+
+ TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) {
+ router ! "reply-trace-token"
+ }
+
+ expectMsg("name=remote-routee|token=remote-routee-1|isOpen=true")
+ }
+ }
+
+}
+
+class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends Actor with ActorLogging {
+ creationTraceContextListener map { recipient ⇒
+ recipient ! currentTraceContextInfo
+ }
+
+ def receive = {
+ case "fail" ⇒
+ throw new ArithmeticException("Division by zero.")
+ case "reply-trace-token" ⇒
+ log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable"))
+ sender ! currentTraceContextInfo
+ }
+
+ def currentTraceContextInfo: String = {
+ TraceRecorder.currentContext.map { context ⇒
+ s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
+ }.getOrElse("unavailable")
+ }
+}
+
+object TraceTokenReplier {
+ def props(creationTraceContextListener: Option[ActorRef]): Props =
+ Props(new TraceTokenReplier(creationTraceContextListener))
+
+ def remoteProps(creationTraceContextListener: Option[ActorRef], remoteAddress: Address): Props = {
+ Props(new TraceTokenReplier(creationTraceContextListener))
+ .withDeploy(Deploy(scope = RemoteScope(remoteAddress)))
+ }
+}
+
+class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) extends Actor {
+ val supervisedChild = context.actorOf(TraceTokenReplier.remoteProps(None, remoteAddress), "remotely-supervised-child")
+
+ def receive = {
+ case "fail" ⇒ supervisedChild ! "fail"
+ }
+
+ override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+ case NonFatal(throwable) ⇒
+ traceContextListener ! currentTraceContextInfo
+ Resume
+ }
+
+ def currentTraceContextInfo: String = {
+ TraceRecorder.currentContext.map { context ⇒
+ s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}"
+ }.getOrElse("unavailable")
+ }
+}