aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka-remote
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit01a34f67ff75419c440f2e69c0a0db888a670a34 (patch)
tree9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-akka-remote
parent4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff)
downloadKamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-akka-remote')
-rw-r--r--kamon-akka-remote/src/main/resources/META-INF/aop.xml2
-rw-r--r--kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala (renamed from kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala)30
-rw-r--r--kamon-akka-remote/src/test/resources/logback.xml17
-rw-r--r--kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala (renamed from kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala)26
4 files changed, 49 insertions, 26 deletions
diff --git a/kamon-akka-remote/src/main/resources/META-INF/aop.xml b/kamon-akka-remote/src/main/resources/META-INF/aop.xml
index ba1c8e79..e84a6094 100644
--- a/kamon-akka-remote/src/main/resources/META-INF/aop.xml
+++ b/kamon-akka-remote/src/main/resources/META-INF/aop.xml
@@ -3,7 +3,7 @@
<aspectj>
<aspects>
<!-- Remoting and Cluster -->
- <aspect name="akka.remote.instrumentation.RemotingInstrumentation"/>
+ <aspect name="akka.kamon.instrumentation.RemotingInstrumentation"/>
</aspects>
<weaver>
diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala
index 6bdee063..eb18ed87 100644
--- a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
+++ b/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala
@@ -1,12 +1,12 @@
-package akka.remote.instrumentation
+package akka.kamon.instrumentation
import akka.actor.{ ActorRef, Address }
import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer }
import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo }
import akka.remote.WireFormats._
import akka.util.ByteString
-import kamon.MilliTimestamp
-import kamon.trace.TraceRecorder
+import kamon.trace.{ Tracer, TraceContext }
+import kamon.util.MilliTimestamp
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
@@ -32,15 +32,13 @@ class RemotingInstrumentation {
envelopeBuilder.setMessage(serializedMessage)
// Attach the TraceContext info, if available.
- if (!TraceRecorder.currentContext.isEmpty) {
- val context = TraceRecorder.currentContext
- val relativeStartMilliTime = System.currentTimeMillis - ((System.nanoTime - context.startRelativeTimestamp.nanos) / 1000000)
+ TraceContext.map { context ⇒
envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
.setTraceName(context.name)
.setTraceToken(context.token)
.setIsOpen(context.isOpen)
- .setStartMilliTime(relativeStartMilliTime)
+ .setStartMilliTime(context.startTimestamp.toMilliTimestamp.millis)
.build())
}
@@ -85,14 +83,16 @@ class RemotingInstrumentation {
if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext
val system = provider.guardian.underlying.system
- val ctx = TraceRecorder.joinRemoteTraceContext(
- remoteTraceContext.getTraceName(),
- remoteTraceContext.getTraceToken(),
- new MilliTimestamp(remoteTraceContext.getStartMilliTime()),
- remoteTraceContext.getIsOpen(),
- system)
-
- TraceRecorder.setContext(ctx)
+ val tracer = Tracer.get(system)
+
+ val ctx = tracer.newContext(
+ remoteTraceContext.getTraceName,
+ remoteTraceContext.getTraceToken,
+ new MilliTimestamp(remoteTraceContext.getStartMilliTime()).toRelativeNanoTimestamp,
+ remoteTraceContext.getIsOpen,
+ isLocal = false)
+
+ TraceContext.setCurrentContext(ctx)
}
pjp.proceed()
diff --git a/kamon-akka-remote/src/test/resources/logback.xml b/kamon-akka-remote/src/test/resources/logback.xml
new file mode 100644
index 00000000..dd623d61
--- /dev/null
+++ b/kamon-akka-remote/src/test/resources/logback.xml
@@ -0,0 +1,17 @@
+<configuration scan="true">
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+ <resetJUL>true</resetJUL>
+ </contextListener>
+
+ <conversionRule conversionWord="traceToken" converterClass="kamon.trace.logging.LogbackTraceTokenConverter"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date{HH:mm:ss.SSS} %-5level [%traceToken][%thread] %logger{55} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="error">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
diff --git a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala
index 8a3973ca..367a7349 100644
--- a/kamon-akka-remote/src/test/scala/kamon/instrumentation/akka/RemotingInstrumentationSpec.scala
+++ b/kamon-akka-remote/src/test/scala/kamon/akka/instrumentation/RemotingInstrumentationSpec.scala
@@ -8,7 +8,8 @@ import akka.routing.RoundRobinGroup
import akka.testkit.{ ImplicitSender, TestKitBase }
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
-import kamon.trace.TraceRecorder
+import kamon.Kamon
+import kamon.trace.TraceContext
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.duration._
@@ -18,6 +19,8 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
implicit lazy val system: ActorSystem = ActorSystem("remoting-spec-local-system", ConfigFactory.parseString(
"""
|akka {
+ | loggers = ["akka.event.slf4j.Slf4jLogger"]
+ |
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
@@ -34,6 +37,8 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
val remoteSystem: ActorSystem = ActorSystem("remoting-spec-remote-system", ConfigFactory.parseString(
"""
|akka {
+ | loggers = ["akka.event.slf4j.Slf4jLogger"]
+ |
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
@@ -47,11 +52,13 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
|}
""".stripMargin))
+ lazy val kamon = Kamon(system)
val RemoteSystemAddress = AddressFromURIString("akka.tcp://remoting-spec-remote-system@127.0.0.1:2553")
+ import kamon.tracer.newContext
"The Remoting instrumentation" should {
"propagate the TraceContext when creating a new remote actor" in {
- TraceRecorder.withNewTraceContext("deploy-remote-actor", Some("deploy-remote-actor-1")) {
+ TraceContext.withContext(newContext("deploy-remote-actor", "deploy-remote-actor-1")) {
system.actorOf(TraceTokenReplier.remoteProps(Some(testActor), RemoteSystemAddress), "remote-deploy-fixture")
}
@@ -61,7 +68,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
"propagate the TraceContext when sending a message to a remotely deployed actor" in {
val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-message-fixture")
- TraceRecorder.withNewTraceContext("message-remote-actor", Some("message-remote-actor-1")) {
+ TraceContext.withContext(newContext("message-remote-actor", "message-remote-actor-1")) {
remoteRef ! "reply-trace-token"
}
@@ -73,7 +80,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
implicit val askTimeout = Timeout(10 seconds)
val remoteRef = system.actorOf(TraceTokenReplier.remoteProps(None, RemoteSystemAddress), "remote-ask-and-pipe-fixture")
- TraceRecorder.withNewTraceContext("ask-and-pipe-remote-actor", Some("ask-and-pipe-remote-actor-1")) {
+ TraceContext.withContext(newContext("ask-and-pipe-remote-actor", "ask-and-pipe-remote-actor-1")) {
(remoteRef ? "reply-trace-token") pipeTo (testActor)
}
@@ -85,7 +92,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
remoteSystem.actorOf(TraceTokenReplier.props(None), "actor-selection-target-b")
val selection = system.actorSelection(RemoteSystemAddress + "/user/actor-selection-target-*")
- TraceRecorder.withNewTraceContext("message-remote-actor-selection", Some("message-remote-actor-selection-1")) {
+ TraceContext.withContext(newContext("message-remote-actor-selection", "message-remote-actor-selection-1")) {
selection ! "reply-trace-token"
}
@@ -97,7 +104,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
"propagate the TraceContext a remotely supervised child fails" in {
val supervisor = system.actorOf(Props(new SupervisorOfRemote(testActor, RemoteSystemAddress)))
- TraceRecorder.withNewTraceContext("remote-supervision", Some("remote-supervision-1")) {
+ TraceContext.withContext(newContext("remote-supervision", "remote-supervision-1")) {
supervisor ! "fail"
}
@@ -108,7 +115,7 @@ class RemotingInstrumentationSpec extends TestKitBase with WordSpecLike with Mat
remoteSystem.actorOf(TraceTokenReplier.props(None), "remote-routee")
val router = system.actorOf(RoundRobinGroup(List(RemoteSystemAddress + "/user/actor-selection-target-*")).props(), "router")
- TraceRecorder.withNewTraceContext("remote-routee", Some("remote-routee-1")) {
+ TraceContext.withContext(newContext("remote-routee", "remote-routee-1")) {
router ! "reply-trace-token"
}
@@ -127,12 +134,11 @@ class TraceTokenReplier(creationTraceContextListener: Option[ActorRef]) extends
case "fail" ⇒
throw new ArithmeticException("Division by zero.")
case "reply-trace-token" ⇒
- log.info("Sending back the TT: " + TraceRecorder.currentContext.token)
sender ! currentTraceContextInfo
}
def currentTraceContextInfo: String = {
- val ctx = TraceRecorder.currentContext
+ val ctx = TraceContext.currentContext
s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}
@@ -161,7 +167,7 @@ class SupervisorOfRemote(traceContextListener: ActorRef, remoteAddress: Address)
}
def currentTraceContextInfo: String = {
- val ctx = TraceRecorder.currentContext
+ val ctx = TraceContext.currentContext
s"name=${ctx.name}|token=${ctx.token}|isOpen=${ctx.isOpen}"
}
}