diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-11-03 23:27:57 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-11-03 23:27:57 +0100 |
commit | 59faf588080b137817444a6877170e2bd687427f (patch) | |
tree | a5f1cadf6c21915938d435e1a9d01583475d0c6a /kamon-akka-remote/src/test | |
parent | 4b999c39b6bd09d891de718fad10b795264755c6 (diff) | |
parent | 6e3d9ae88ecce10420eeac82294c54c1b43dedf4 (diff) | |
download | Kamon-59faf588080b137817444a6877170e2bd687427f.tar.gz Kamon-59faf588080b137817444a6877170e2bd687427f.tar.bz2 Kamon-59faf588080b137817444a6877170e2bd687427f.zip |
Merge branch 'master' into release-0.2, kamon-play still need fixes.
Conflicts:
kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala
kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala
kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
project/Dependencies.scala
Diffstat (limited to 'kamon-akka-remote/src/test')
-rw-r--r-- | kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala new file mode 100644 index 00000000..63cc9832 --- /dev/null +++ b/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala @@ -0,0 +1,170 @@ +package kamon.instrumentation.akka + +import akka.actor.SupervisorStrategy.Resume +import akka.actor._ +import akka.pattern.{ ask, pipe } +import akka.remote.RemoteScope +import akka.routing.RoundRobinRouter +import akka.testkit.{ ImplicitSender, TestKitBase } +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import kamon.trace.TraceRecorder +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers { + implicit def self = testActor + + implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString( + """ + |akka { + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | remote { + | enabled-transports = ["akka.remote.netty.tcp"] + | netty.tcp { + | hostname = "127.0.0.1" + | port = 2552 + | } + | } + |} + """.stripMargin)) + + val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString( + """ + |akka { + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } + | remote { + | enabled-transports = ["akka.remote.netty.tcp"] + | netty.tcp { + | hostname = "127.0.0.1" + | port = 2553 + | } + | } + |} + """.stripMargin)) + + val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553") + + "The Remoting instrumentation" should { + "propagate the TraceContext when creating a new remote actor" in { + TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) { + system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture") + } + + expectMsg("name=deploy-remote-actor|token=deploy-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when sending a message to a remotely deployed actor" in { + val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture") + + TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) { + remoteRef ! "reply-trace-token" + } + + expectMsg("name=message-remote-actor|token=message-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when pipe or ask a message to a remotely deployed actor" in { + implicit val ec = system.dispatcher + implicit val askTimeout = Timeout(10 seconds) + val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture") + + TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) { + (remoteRef ? "reply-trace-token") pipeTo (testActor) + } + + expectMsg("name=ask-and-pipe-remote-actor|token=ask-and-pipe-remote-actor-1|isOpen=true") + } + + "propagate the TraceContext when sending a message to an ActorSelection" in { + remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-a") + remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b") + val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*") + + TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) { + selection ! "reply-trace-token" + } + + // one for each selected actor + expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") + expectMsg("name=message-remote-actor-selection|token=message-remote-actor-selection-1|isOpen=true") + } + + "propagate the TraceContext a remotely supervised child fails" in { + val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress))) + + TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) { + supervisor ! "fail" + } + + expectMsg("name=remote-supervision|token=remote-supervision-1|isOpen=true") + } + + "propagate the TraceContext when sending messages to remote routees of a router" in { + remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee") + val routees = Vector[String](RemoteSystemAddress + "/user/remote-routee") + val router = system.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = routees))) + + TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) { + router ! "reply-trace-token" + } + + expectMsg("name=remote-routee|token=remote-routee-1|isOpen=true") + } + } + +} + +class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends Actor with ActorLogging { + creationTraceContextListener map { recipient ⇒ + recipient ! currentTraceContextInfo + } + + def receive = { + case "fail" ⇒ + throw new ArithmeticException("Division by zero.") + case "reply-trace-token" ⇒ + log.info("Sending back the TT: " + TraceRecorder.currentContext.token) + sender ! currentTraceContextInfo + } + + def currentTraceContextInfo: String = { + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" + } +} + +object TraceTokenReplier { + def props(creationTraceContextListener: Option[ActorRef]): Props = + Props(new TraceTokenReplier(creationTraceContextListener)) + + def remoteProps(creationTraceContextListener: Option[ActorRef], remoteAddress: Address): Props = { + Props(new TraceTokenReplier(creationTraceContextListener)) + .withDeploy(Deploy(scope = RemoteScope(remoteAddress))) + } +} + +class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address) extends Actor { + val supervisedChild = context.actorOf(TraceTokenReplier.remoteProps(None, remoteAddress), "remotely-supervised-child") + + def receive = { + case "fail" ⇒ supervisedChild ! "fail" + } + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case NonFatal(throwable) ⇒ + traceContextListener ! currentTraceContextInfo + Resume + } + + def currentTraceContextInfo: String = { + val ctx = TraceRecorder.currentContext + s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}" + } +} |