diff options
Diffstat (limited to 'kamon-core/src/main/scala')
3 files changed, 159 insertions, 10 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala new file mode 100644 index 00000000..341b0ee7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala @@ -0,0 +1,91 @@ +package akka.remote.instrumentation + +import akka.actor.{ ActorRef, Address } +import akka.dispatch.sysmsg.SystemMessage +import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer } +import akka.remote.transport.AkkaPduCodec.Message +import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo } +import akka.remote.WireFormats._ +import akka.util.ByteString +import kamon.trace.{ TraceContextAware, TraceRecorder } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class RemotingInstrumentation { + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage, + senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {} + + @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)") + def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef, + serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = { + + val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder + val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder + + envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) + senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } + seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } + ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } + envelopeBuilder.setMessage(serializedMessage) + + // Attach the TraceContext info, if available. + TraceRecorder.currentContext.foreach { context ⇒ + envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder() + .setTraceName(context.name) + .setTraceToken(context.token) + .setIsOpen(context.isOpen) + .setStartMilliTime(context.startMilliTime) + .build()) + } + + ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) + ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { + val ackBuilder = AcknowledgementInfo.newBuilder() + ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue) + ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) } + ackBuilder + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = { + ActorRefData.newBuilder.setPath( + if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() + } + + // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access. + private def serializeAddress(address: Address): AddressData = address match { + case Address(protocol, system, Some(host), Some(port)) ⇒ + AddressData.newBuilder + .setHostname(host) + .setPort(port) + .setSystem(system) + .setProtocol(protocol) + .build() + case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.") + } + + @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") // && args(raw, provider, localAddress)") + def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} //(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} + + @Around("decodeRemoteMessage(bs, provider, localAddress)") + def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = { + val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray) + + if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { + val traceContext = ackAndEnvelope.getEnvelope.getTraceContext + val system = provider.guardian.underlying.system + val tc = TraceRecorder.joinRemoteTraceContext(traceContext, system) + + TraceRecorder.setContext(Some(tc)) + } + + pjp.proceed() + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 9ce3cd4e..6ea30511 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -16,6 +16,8 @@ package kamon.trace +import java.io.ObjectStreamException + import akka.actor.ActorSystem import kamon.Kamon import kamon.metric._ @@ -32,6 +34,9 @@ trait TraceContext { def levelOfDetail: TracingLevelOfDetail def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle def finish(metadata: Map[String, String]) + def origin: TraceContextOrigin + def startMilliTime: Long + def isOpen: Boolean private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage } @@ -51,7 +56,13 @@ case object OnlyMetrics extends TracingLevelOfDetail case object SimpleTrace extends TracingLevelOfDetail case object FullTrace extends TracingLevelOfDetail -trait TraceContextAware { +sealed trait TraceContextOrigin +object TraceContextOrigin { + case object Local extends TraceContextOrigin + case object Remote extends TraceContextOrigin +} + +trait TraceContextAware extends Serializable { def captureNanoTime: Long def traceContext: Option[TraceContext] } @@ -60,8 +71,20 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - val captureNanoTime = System.nanoTime() - val traceContext = TraceRecorder.currentContext + @transient val captureNanoTime = System.nanoTime() + @transient val traceContext = TraceRecorder.currentContext + + // + // Beware of this hack, it might bite us in the future! + // + // When using remoting/cluster all messages carry the TraceContext in the envelope in which they + // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is + // available (if any) when the system messages are read and this will make sure that it is correctly + // captured and propagated. + @throws[ObjectStreamException] + private def readResolve: AnyRef = { + new DefaultTraceContextAware + } } } @@ -75,11 +98,15 @@ object SegmentCompletionHandleAware { class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {} } -class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String], - val system: ActorSystem) extends TraceContext { - @volatile private var _isOpen = true +class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String], + val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis, + izOpen: Boolean = true) extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + val levelOfDetail = OnlyMetrics - val startMark = System.nanoTime() + val startNanoTime = System.nanoTime() val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() val metricsExtension = Kamon(Metrics)(system) @@ -91,11 +118,20 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok def finish(metadata: Map[String, String]): Unit = { _isOpen = false - val finishMark = System.nanoTime() + + val elapsedNanoTime = + if (origin == TraceContextOrigin.Local) + // Everything is local, nanoTime is still the best resolution we can use. + System.nanoTime() - startNanoTime + else + // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds + // to be consistent with unit used for all latency measurements. + (System.currentTimeMillis() - startMilliTime) * 1000000L + val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(finishMark - startMark) + traceMetrics.elapsedTime.record(elapsedNanoTime) drainFinishedSegments(traceMetrics) } } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala index 0b3118ed..bc7a0db2 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala @@ -16,6 +16,8 @@ package kamon.trace +import akka.remote.instrumentation.TraceContextAwareWireFormats.RemoteTraceContext + import scala.language.experimental.macros import java.util.concurrent.atomic.AtomicLong import kamon.macros.InlineTraceContextMacro @@ -40,7 +42,27 @@ object TraceRecorder { // In the future this should select between implementations. val finalToken = token.getOrElse(newToken) - new SimpleMetricCollectionContext(name, finalToken, metadata, system) + new SimpleMetricCollectionContext(name, finalToken, metadata, TraceContextOrigin.Local, system) + } + + def joinRemoteTraceContext(remoteTraceContext: RemoteTraceContext, system: ActorSystem): TraceContext = { + new SimpleMetricCollectionContext( + remoteTraceContext.getTraceName(), + remoteTraceContext.getTraceToken(), + Map.empty, + TraceContextOrigin.Remote, + system, + remoteTraceContext.getStartMilliTime(), + remoteTraceContext.getIsOpen()) + } + + def forkTraceContext(context: TraceContext, newName: String): TraceContext = { + new SimpleMetricCollectionContext( + newName, + context.token, + Map.empty, + TraceContextOrigin.Local, + context.system) } def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) |