aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-10-24 23:58:57 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2014-10-24 23:58:57 +0200
commit30940181424be69e0fd64e945fe5a64b4523457b (patch)
tree1a01e2f895a76b978944abc13aef25f180f71c44 /kamon-core/src/main/scala/kamon/instrumentation
parent4663779e89c87684317bdfc468ff864754d6fad3 (diff)
downloadKamon-30940181424be69e0fd64e945fe5a64b4523457b.tar.gz
Kamon-30940181424be69e0fd64e945fe5a64b4523457b.tar.bz2
Kamon-30940181424be69e0fd64e945fe5a64b4523457b.zip
+ akka-remote: create a new kamon-akka-remote module, closes #99
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala91
1 files changed, 0 insertions, 91 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
deleted file mode 100644
index 00747935..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package akka.remote.instrumentation
-
-import akka.actor.{ ActorRef, Address }
-import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer }
-import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo }
-import akka.remote.WireFormats._
-import akka.util.ByteString
-import kamon.trace.TraceRecorder
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class RemotingInstrumentation {
-
- @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && " +
- "args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
- def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage,
- senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {}
-
- @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
- def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef,
- serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = {
-
- val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder
- val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder
-
- envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
- senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
- seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) }
- ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
- envelopeBuilder.setMessage(serializedMessage)
-
- // Attach the TraceContext info, if available.
- TraceRecorder.currentContext.foreach { context ⇒
- envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
- .setTraceName(context.name)
- .setTraceToken(context.token)
- .setIsOpen(context.isOpen)
- .setStartMilliTime(context.startMilliTime)
- .build())
- }
-
- ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder)
- ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!)
- }
-
- // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
- private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
- val ackBuilder = AcknowledgementInfo.newBuilder()
- ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue)
- ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) }
- ackBuilder
- }
-
- // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
- private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
- ActorRefData.newBuilder.setPath(
- if (ref.path.address.host.isDefined) ref.path.toSerializationFormat
- else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
- }
-
- // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
- private def serializeAddress(address: Address): AddressData = address match {
- case Address(protocol, system, Some(host), Some(port)) ⇒
- AddressData.newBuilder
- .setHostname(host)
- .setPort(port)
- .setSystem(system)
- .setProtocol(protocol)
- .build()
- case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
- }
-
- @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)")
- def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {}
-
- @Around("decodeRemoteMessage(bs, provider, localAddress)")
- def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = {
- val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray)
-
- if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
- val traceContext = ackAndEnvelope.getEnvelope.getTraceContext
- val system = provider.guardian.underlying.system
- val tc = TraceRecorder.joinRemoteTraceContext(traceContext, system)
-
- TraceRecorder.setContext(Some(tc))
- }
-
- pjp.proceed()
- }
-}