aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka-remote/src/main/scala/kamon/akka/instrumentation/RemotingInstrumentation.scala
blob: 7564cfd042fcbd9e47d6253238a111347841d0b6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package akka.kamon.instrumentation

import akka.actor.{ ActorRef, Address }
import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer }
import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo }
import akka.remote.WireFormats._
import akka.util.ByteString
import kamon.Kamon
import kamon.trace.{ Tracer, TraceContext }
import kamon.util.MilliTimestamp
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._

@Aspect
class RemotingInstrumentation {

  @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " +
    "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
  def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage,
    senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {}

  @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
  def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef,
    serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = {

    val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder
    val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder

    envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
    senderOption foreach { ref  envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
    seqOption foreach { seq  envelopeBuilder.setSeq(seq.rawValue) }
    ackOption foreach { ack  ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
    envelopeBuilder.setMessage(serializedMessage)

    // Attach the TraceContext info, if available.
    Tracer.currentContext.collect { context 

      envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
        .setTraceName(context.name)
        .setTraceToken(context.token)
        .setIsOpen(context.isOpen)
        .setStartMilliTime(context.startTimestamp.toMilliTimestamp.millis)
        .build())
    }

    ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder)
    ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!)
  }

  // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
  private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
    val ackBuilder = AcknowledgementInfo.newBuilder()
    ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue)
    ack.nacks foreach { nack  ackBuilder.addNacks(nack.rawValue) }
    ackBuilder
  }

  // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
  private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
    ActorRefData.newBuilder.setPath(
      if (ref.path.address.host.isDefined) ref.path.toSerializationFormat
      else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
  }

  // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
  private def serializeAddress(address: Address): AddressData = address match {
    case Address(protocol, system, Some(host), Some(port)) 
      AddressData.newBuilder
        .setHostname(host)
        .setPort(port)
        .setSystem(system)
        .setProtocol(protocol)
        .build()
    case _  throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
  }

  @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)")
  def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {}

  @Around("decodeRemoteMessage(bs, provider, localAddress)")
  def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = {
    val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray)

    if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
      val remoteTraceContext = ackAndEnvelope.getEnvelope.getTraceContext
      val system = provider.guardian.underlying.system
      val tracer = Kamon.tracer

      val ctx = tracer.newContext(
        remoteTraceContext.getTraceName,
        Option(remoteTraceContext.getTraceToken),
        new MilliTimestamp(remoteTraceContext.getStartMilliTime()).toRelativeNanoTimestamp,
        remoteTraceContext.getIsOpen,
        isLocal = false)

      Tracer.setCurrentContext(ctx)
    }

    pjp.proceed()
  }
}