aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-08-31 21:52:06 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-08-31 21:52:06 -0300
commitc71d44cb57c5daddaa8e58cd1b559bb88bbd2a04 (patch)
treed6690bef6f442d63673223447b807223a9b8cafc /kamon-core/src/main/scala
parent6fe0f1294a53d45e6c1b863f32fe22f3a641d25c (diff)
downloadKamon-c71d44cb57c5daddaa8e58cd1b559bb88bbd2a04.tar.gz
Kamon-c71d44cb57c5daddaa8e58cd1b559bb88bbd2a04.tar.bz2
Kamon-c71d44cb57c5daddaa8e58cd1b559bb88bbd2a04.zip
+ core: initial support for akka remoting/cluster, related to #61
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala91
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala54
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala24
3 files changed, 159 insertions, 10 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
new file mode 100644
index 00000000..341b0ee7
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RemotingInstrumentation.scala
@@ -0,0 +1,91 @@
+package akka.remote.instrumentation
+
+import akka.actor.{ ActorRef, Address }
+import akka.dispatch.sysmsg.SystemMessage
+import akka.remote.instrumentation.TraceContextAwareWireFormats.{ TraceContextAwareRemoteEnvelope, RemoteTraceContext, AckAndTraceContextAwareEnvelopeContainer }
+import akka.remote.transport.AkkaPduCodec.Message
+import akka.remote.{ RemoteActorRefProvider, Ack, SeqNo }
+import akka.remote.WireFormats._
+import akka.util.ByteString
+import kamon.trace.{ TraceContextAware, TraceRecorder }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
+
+@Aspect
+class RemotingInstrumentation {
+
+ @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(..)) && args(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
+ def constructAkkaPduMessage(localAddress: Address, recipient: ActorRef, serializedMessage: SerializedMessage,
+ senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): Unit = {}
+
+ @Around("constructAkkaPduMessage(localAddress, recipient, serializedMessage, senderOption, seqOption, ackOption)")
+ def aroundSerializeRemoteMessage(pjp: ProceedingJoinPoint, localAddress: Address, recipient: ActorRef,
+ serializedMessage: SerializedMessage, senderOption: Option[ActorRef], seqOption: Option[SeqNo], ackOption: Option[Ack]): AnyRef = {
+
+ val ackAndEnvelopeBuilder = AckAndTraceContextAwareEnvelopeContainer.newBuilder
+ val envelopeBuilder = TraceContextAwareRemoteEnvelope.newBuilder
+
+ envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
+ senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) }
+ seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) }
+ ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
+ envelopeBuilder.setMessage(serializedMessage)
+
+ // Attach the TraceContext info, if available.
+ TraceRecorder.currentContext.foreach { context ⇒
+ envelopeBuilder.setTraceContext(RemoteTraceContext.newBuilder()
+ .setTraceName(context.name)
+ .setTraceToken(context.token)
+ .setIsOpen(context.isOpen)
+ .setStartMilliTime(context.startMilliTime)
+ .build())
+ }
+
+ ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder)
+ ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!)
+ }
+
+ // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
+ private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
+ val ackBuilder = AcknowledgementInfo.newBuilder()
+ ackBuilder.setCumulativeAck(ack.cumulativeAck.rawValue)
+ ack.nacks foreach { nack ⇒ ackBuilder.addNacks(nack.rawValue) }
+ ackBuilder
+ }
+
+ // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
+ private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefData = {
+ ActorRefData.newBuilder.setPath(
+ if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
+ }
+
+ // Copied from akka.remote.transport.AkkaPduProtobufCodec because of private access.
+ private def serializeAddress(address: Address): AddressData = address match {
+ case Address(protocol, system, Some(host), Some(port)) ⇒
+ AddressData.newBuilder
+ .setHostname(host)
+ .setPort(port)
+ .setSystem(system)
+ .setProtocol(protocol)
+ .build()
+ case _ ⇒ throw new IllegalArgumentException(s"Address [${address}] could not be serialized: host or port missing.")
+ }
+
+ @Pointcut("execution(* akka.remote.transport.AkkaPduProtobufCodec$.decodeMessage(..)) && args(bs, provider, localAddress)") // && args(raw, provider, localAddress)")
+ def decodeRemoteMessage(bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {} //(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): Unit = {}
+
+ @Around("decodeRemoteMessage(bs, provider, localAddress)")
+ def aroundDecodeRemoteMessage(pjp: ProceedingJoinPoint, bs: ByteString, provider: RemoteActorRefProvider, localAddress: Address): AnyRef = {
+ val ackAndEnvelope = AckAndTraceContextAwareEnvelopeContainer.parseFrom(bs.toArray)
+
+ if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) {
+ val traceContext = ackAndEnvelope.getEnvelope.getTraceContext
+ val system = provider.guardian.underlying.system
+ val tc = TraceRecorder.joinRemoteTraceContext(traceContext, system)
+
+ TraceRecorder.setContext(Some(tc))
+ }
+
+ pjp.proceed()
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 9ce3cd4e..6ea30511 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -16,6 +16,8 @@
package kamon.trace
+import java.io.ObjectStreamException
+
import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metric._
@@ -32,6 +34,9 @@ trait TraceContext {
def levelOfDetail: TracingLevelOfDetail
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
def finish(metadata: Map[String, String])
+ def origin: TraceContextOrigin
+ def startMilliTime: Long
+ def isOpen: Boolean
private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}
@@ -51,7 +56,13 @@ case object OnlyMetrics extends TracingLevelOfDetail
case object SimpleTrace extends TracingLevelOfDetail
case object FullTrace extends TracingLevelOfDetail
-trait TraceContextAware {
+sealed trait TraceContextOrigin
+object TraceContextOrigin {
+ case object Local extends TraceContextOrigin
+ case object Remote extends TraceContextOrigin
+}
+
+trait TraceContextAware extends Serializable {
def captureNanoTime: Long
def traceContext: Option[TraceContext]
}
@@ -60,8 +71,20 @@ object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
- val captureNanoTime = System.nanoTime()
- val traceContext = TraceRecorder.currentContext
+ @transient val captureNanoTime = System.nanoTime()
+ @transient val traceContext = TraceRecorder.currentContext
+
+ //
+ // Beware of this hack, it might bite us in the future!
+ //
+ // When using remoting/cluster all messages carry the TraceContext in the envelope in which they
+ // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
+ // available (if any) when the system messages are read and this will make sure that it is correctly
+ // captured and propagated.
+ @throws[ObjectStreamException]
+ private def readResolve: AnyRef = {
+ new DefaultTraceContextAware
+ }
}
}
@@ -75,11 +98,15 @@ object SegmentCompletionHandleAware {
class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
}
-class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
- val system: ActorSystem) extends TraceContext {
- @volatile private var _isOpen = true
+class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String],
+ val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis,
+ izOpen: Boolean = true) extends TraceContext {
+
+ @volatile private var _name = traceName
+ @volatile private var _isOpen = izOpen
+
val levelOfDetail = OnlyMetrics
- val startMark = System.nanoTime()
+ val startNanoTime = System.nanoTime()
val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
val metricsExtension = Kamon(Metrics)(system)
@@ -91,11 +118,20 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok
def finish(metadata: Map[String, String]): Unit = {
_isOpen = false
- val finishMark = System.nanoTime()
+
+ val elapsedNanoTime =
+ if (origin == TraceContextOrigin.Local)
+ // Everything is local, nanoTime is still the best resolution we can use.
+ System.nanoTime() - startNanoTime
+ else
+ // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds
+ // to be consistent with unit used for all latency measurements.
+ (System.currentTimeMillis() - startMilliTime) * 1000000L
+
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(finishMark - startMark)
+ traceMetrics.elapsedTime.record(elapsedNanoTime)
drainFinishedSegments(traceMetrics)
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 0b3118ed..bc7a0db2 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -16,6 +16,8 @@
package kamon.trace
+import akka.remote.instrumentation.TraceContextAwareWireFormats.RemoteTraceContext
+
import scala.language.experimental.macros
import java.util.concurrent.atomic.AtomicLong
import kamon.macros.InlineTraceContextMacro
@@ -40,7 +42,27 @@ object TraceRecorder {
// In the future this should select between implementations.
val finalToken = token.getOrElse(newToken)
- new SimpleMetricCollectionContext(name, finalToken, metadata, system)
+ new SimpleMetricCollectionContext(name, finalToken, metadata, TraceContextOrigin.Local, system)
+ }
+
+ def joinRemoteTraceContext(remoteTraceContext: RemoteTraceContext, system: ActorSystem): TraceContext = {
+ new SimpleMetricCollectionContext(
+ remoteTraceContext.getTraceName(),
+ remoteTraceContext.getTraceToken(),
+ Map.empty,
+ TraceContextOrigin.Remote,
+ system,
+ remoteTraceContext.getStartMilliTime(),
+ remoteTraceContext.getIsOpen())
+ }
+
+ def forkTraceContext(context: TraceContext, newName: String): TraceContext = {
+ new SimpleMetricCollectionContext(
+ newName,
+ context.token,
+ Map.empty,
+ TraceContextOrigin.Local,
+ context.system)
}
def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context)