From 3e2c2b3ba39ad8cca4874e3be3004f8a182dab36 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 24 Oct 2014 23:58:57 +0200 Subject: + akka-remote: create a new kamon-akka-remote module, closes #99 --- .../akka/RemotingInstrumentation.scala | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala (limited to 'kamon-akka-remote/src/main/scala/kamon/instrumentation/akka') diff --git a/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala new file mode 100644 index 00000000..3278ec67 --- /dev/null +++ b/kamon-akka-remote/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala @@ -0,0 +1,96 @@ +package akka.remote.instrumentation + +import akka.actor.{ ActorRef, Address } +import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } +import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } +import akka.remote.WireFormats._ +import akka.util.ByteString +import kamon.trace.TraceRecorder +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class RemotingInstrumentation { + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " + + "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage, + senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {} + + @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef, + serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = { + + val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder + val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder + + envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) + senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } + seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } + ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } + envelopeBuilder.setMessage(serializedMessage) + + // Attach the TraceContext info, if available. + TraceRecorder.currentContext.foreach { context ⇒ + envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() + .setTraceName(context.name) + .setTraceToken(context.token) + .setIsOpen(context.isOpen) + .setStartMilliTime(context.startMilliTime) + .build()) + } + + ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) + ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { + val ackBuilder = AcknowledgementInfo.newBuilder() + ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue) + ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) } + ackBuilder + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = { + ActorRefData.newBuilder.setPath( + if (ref.path.address.host.isDefined) ref.path.toSerializationFormat + else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeAddress(address: Address): AddressData = address match { + case Address(protocol, system, Some(host), Some(port)) ⇒ + AddressData.newBuilder + .setHostname(host) + .setPort(port) + .setSystem(system) + .setProtocol(protocol) + .build() + case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") + } + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") + def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} + + @Around("decodeRemoteMessage(bs, provider, localAddress)") + def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = { + val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray) + + if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { + val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext + val system = provider.guardian.underlying.system + val tc = TraceRecorder.joinRemoteTraceContext( + remoteTraceContext.getTraceName(), + remoteTraceContext.getTraceToken(), + remoteTraceContext.getStartMilliTime(), + remoteTraceContext.getIsOpen(), + system) + + TraceRecorder.setContext(Some(tc)) + } + + pjp.proceed() + } +} -- cgit v1.2.3