From 3e2c2b3ba39ad8cca4874e3be3004f8a182dab36 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 24 Oct 2014 23:58:57 +0200 Subject: + akka-remote: create a new kamon-akka-remote module, closes #99 --- .../akka/RemotingInstrumentation.scala | 91 ---------------------- .../src/main/scala/kamon/trace/TraceRecorder.scala | 21 ++--- 2 files changed, 5 insertions(+), 107 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala deleted file mode 100644 index 00747935..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala +++ /dev/null @@ -1,91 +0,0 @@ -package akka.remote.instrumentation - -import akka.actor.{ ActorRef, Address } -import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } -import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } -import akka.remote.WireFormats._ -import akka.util.ByteString -import kamon.trace.TraceRecorder -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class RemotingInstrumentation { - - @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " + - "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") - def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {} - - @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") - def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef, - serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = { - - val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder - val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder - - envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) - senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } - seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } - ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } - envelopeBuilder.setMessage(serializedMessage) - - // Attach the TraceContext info, if available. - TraceRecorder.currentContext.foreach { context ⇒ - envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() - .setTraceName(context.name) - .setTraceToken(context.token) - .setIsOpen(context.isOpen) - .setStartMilliTime(context.startMilliTime) - .build()) - } - - ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) - ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { - val ackBuilder = AcknowledgementInfo.newBuilder() - ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue) - ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) } - ackBuilder - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = { - ActorRefData.newBuilder.setPath( - if (ref.path.address.host.isDefined) ref.path.toSerializationFormat - else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() - } - - // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. - private def serializeAddress(address: Address): AddressData = address match { - case Address(protocol, system, Some(host), Some(port)) ⇒ - AddressData.newBuilder - .setHostname(host) - .setPort(port) - .setSystem(system) - .setProtocol(protocol) - .build() - case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") - } - - @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") - def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} - - @Around("decodeRemoteMessage(bs, provider, localAddress)") - def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = { - val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray) - - if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { - val traceContext = ackAndEnvelope.getEnvelope.getTraceContext - val system = provider.guardian.underlying.system - val tc = TraceRecorder.joinRemoteTraceContext(traceContext, system) - - TraceRecorder.setContext(Some(tc)) - } - - pjp.proceed() - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index bc7a0db2..778edc42 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,8 +16,6 @@ package kamon.trace -import akka.remote.instrumentation.TraceContextAwareWireFormats.RemoteTraceContext - import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong import kamon.macros.InlineTraceContextMacro @@ -45,24 +43,15 @@ object TraceRecorder { new SimpleMetricCollectionContext(name, finalToken, metadata, TraceContextOrigin.Local, system) } - def joinRemoteTraceContext(remoteTraceContext: RemoteTraceContext, system: ActorSystem): TraceContext = { + def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { new SimpleMetricCollectionContext( - remoteTraceContext.getTraceName(), - remoteTraceContext.getTraceToken(), + traceName, + traceToken, Map.empty, TraceContextOrigin.Remote, system, - remoteTraceContext.getStartMilliTime(), - remoteTraceContext.getIsOpen()) - } - - def forkTraceContext(context: TraceContext, newName: String): TraceContext = { - new SimpleMetricCollectionContext( - newName, - context.token, - Map.empty, - TraceContextOrigin.Local, - context.system) + startMilliTime, + isOpen) } def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) -- cgit v1.2.3