From 30940181424be69e0fd64e945fe5a64b4523457b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 24 Oct 2014 23:58:57 +0200 Subject: + akka-remote: create a new kamon-akka-remote module, closes #99 --- .../akka/RemotingInstrumentationSpec.scala | 168 --------------------- 1 file changed, 168 deletions(-) delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala (limited to 'kamon-core/src/test') diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala deleted file mode 100644 index 5ce90047..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala +++ /dev/null @@ -1,168 +0,0 @@ -package kamon.instrumentation.akka - -import akka.actor.SupervisorStrategy.Resume -import akka.actor._ -import akka.remote.RemoteScope -import akka.routing.RoundRobinGroup -import akka.testkit.{ ImplicitSender, TestKitBase } -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.TraceRecorder -import org.scalatest.{ Matchers, WordSpecLike } -import akka.pattern.{ ask, pipe } -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString( - """ - |akka { - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | remote { - | enabled-transports = ["akka.remote.netty.tcp"] - | netty.tcp { - | hostname = "127.0.0.1" - | port = 2552 - | } - | } - |} - """.stripMargin)) - - val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString( - """ - |akka { - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | remote { - | enabled-transports = ["akka.remote.netty.tcp"] - | netty.tcp { - | hostname = "127.0.0.1" - | port = 2553 - | } - | } - |} - """.stripMargin)) - - val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553") - - "The Remoting instrumentation" should { - "propagate the TraceContext when creating a new remote actor" in { - TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) { - system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture") - } - - expectMsg("name=deploy-remote-actor|token=deploy-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when sending a message to a remotely deployed actor" in { - val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture") - - TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) { - remoteRef ! "reply-trace-token" - } - - expectMsg("name=message-remote-actor|token=message-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when pipe or ask a message to a remotely deployed actor" in { - implicit val ec = system.dispatcher - implicit val askTimeout = Timeout(10 seconds) - val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture") - - TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) { - (remoteRef ? "reply-trace-token") pipeTo (testActor) - } - - expectMsg("name=ask-and-pipe-remote-actor|token=ask-and-pipe-remote-actor-1|isOpen=true") - } - - "propagate the TraceContext when sending a message to an ActorSelection" in { - remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-a") - remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b") - val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*") - - TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) { - selection ! "reply-trace-token" - } - - // one for each selected actor - expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") - expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") - } - - "propagate the TraceContext a remotely supervised child fails" in { - val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress))) - - TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) { - supervisor ! "fail" - } - - expectMsg("name=remote-supervision|token=remote-supervision-1|isOpen=true") - } - - "propagate the TraceContext when sending messages to remote routees of a router" in { - remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee") - val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router") - - TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) { - router ! "reply-trace-token" - } - - expectMsg("name=remote-routee|token=remote-routee-1|isOpen=true") - } - } - -} - -class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends Actor with ActorLogging { - creationTraceContextListener map { recipient ⇒ - recipient ! currentTraceContextInfo - } - - def receive = { - case "fail" ⇒ - throw new ArithmeticException("Division by zero.") - case "reply-trace-token" ⇒ - log.info("Sending back the TT: " + TraceRecorder.currentContext.map(_.token).getOrElse("unavailable")) - sender ! currentTraceContextInfo - } - - def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") - } -} - -object TraceTokenReplier { - def props(creationTraceContextListener: Option[ActorRef]): Props = - Props(new TraceTokenReplier(creationTraceContextListener)) - - def remoteProps(creationTraceContextListener: Option[ActorRef], remoteAddress: Address): Props = { - Props(new TraceTokenReplier(creationTraceContextListener)) - .withDeploy(Deploy(scope = RemoteScope(remoteAddress))) - } -} - -class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) extends Actor { - val supervisedChild = context.actorOf(TraceTokenReplier.remoteProps(None, remoteAddress), "remotely-supervised-child") - - def receive = { - case "fail" ⇒ supervisedChild ! "fail" - } - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ - traceContextListener ! currentTraceContextInfo - Resume - } - - def currentTraceContextInfo: String = { - TraceRecorder.currentContext.map { context ⇒ - s"name=${context.name}|token=${context.token}|isOpen=${context.isOpen}" - }.getOrElse("unavailable") - } -} -- cgit v1.2.3