aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/akka/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-02 18:09:53 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-13 17:37:20 -0300
commit7a10c0ef2a6566229e8571f6d385ca2ff794cc20 (patch)
treececd7ce6eb7a71f967eaa1605615780fa94d346c /kamon-core/src/main/scala/akka/instrumentation
parent54143e4af6182b967736abc60a7fb20c88dd6587 (diff)
downloadKamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.gz
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.bz2
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.zip
integrate trace and metrics into the base project
Diffstat (limited to 'kamon-core/src/main/scala/akka/instrumentation')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala93
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala65
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala49
3 files changed, 207 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
new file mode 100644
index 00000000..6cede344
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -0,0 +1,93 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{ Cell, Props, ActorSystem, ActorRef }
+import akka.dispatch.{ Envelope, MessageDispatcher }
+import kamon.trace.{ TraceContext, ContextAware, Trace }
+import kamon.metrics.{ HdrActorMetricsRecorder, ActorMetrics }
+import kamon.Kamon
+
+@Aspect("perthis(actorCellCreation(*, *, *, *, *))")
+class BehaviourInvokeTracing {
+ var path: Option[String] = None
+ var actorMetrics: Option[HdrActorMetricsRecorder] = None
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def afterCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val metricsExtension = Kamon(ActorMetrics)(system)
+ val simplePathString = ref.path.elements.mkString("/")
+
+ if (metricsExtension.shouldTrackActor(simplePathString)) {
+ path = Some(ref.path.toString)
+ actorMetrics = Some(metricsExtension.registerActor(simplePathString))
+ }
+ }
+
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ val timestampBeforeProcessing = System.nanoTime()
+ val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware]
+
+ Trace.withContext(contextAndTimestamp.traceContext) {
+ pjp.proceed()
+ }
+
+ actorMetrics.map { am ⇒
+ am.recordProcessingTime(System.nanoTime() - timestampBeforeProcessing)
+ am.recordTimeInMailbox(timestampBeforeProcessing - contextAndTimestamp.timestamp)
+ }
+ }
+
+ @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+ def actorStop(cell: Cell): Unit = {}
+
+ @After("actorStop(cell)")
+ def afterStop(cell: Cell): Unit = {
+ path.map(p ⇒ Kamon(ActorMetrics)(cell.system).unregisterActor(p))
+ }
+}
+
+@Aspect
+class EnvelopeTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.Envelope")
+ def mixin: ContextAndTimestampAware = new ContextAndTimestampAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ val timestamp: Long = System.nanoTime()
+ }
+
+ @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+trait ContextAndTimestampAware extends ContextAware {
+ def timestamp: Long
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
new file mode 100644
index 00000000..7d26016e
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala
@@ -0,0 +1,65 @@
+package akka.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.trace.{ Trace, ContextAware }
+import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
+import org.aspectj.lang.ProceedingJoinPoint
+
+@Aspect
+class SystemMessageTraceContextMixin {
+
+ @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+}
+
+@Aspect
+class RepointableActorRefTraceContextMixin {
+
+ @DeclareMixin("akka.actor.RepointableActorRef")
+ def mixin: ContextAware = ContextAware.default
+
+ @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)")
+ def envelopeCreation(ctx: ContextAware): Unit = {}
+
+ @After("envelopeCreation(ctx)")
+ def afterEnvelopeCreation(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of ContextAware at the moment of creation.
+ ctx.traceContext
+ }
+
+ @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)")
+ def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {}
+
+ @Around("repointableActorRefCreation(repointableActorRef)")
+ def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = {
+ Trace.withContext(repointableActorRef.traceContext) {
+ pjp.proceed()
+ }
+ }
+
+}
+
+@Aspect
+class ActorSystemMessagePassingTracing {
+
+ @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)")
+ def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {}
+
+ @Around("systemMessageProcessing(messages)")
+ def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = {
+ if (messages.nonEmpty) {
+ val ctx = messages.head.asInstanceOf[ContextAware].traceContext
+ Trace.withContext(ctx)(pjp.proceed())
+
+ } else pjp.proceed()
+ }
+}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
new file mode 100644
index 00000000..b5b23e61
--- /dev/null
+++ b/kamon-core/src/main/scala/akka/instrumentation/AskPatternTracing.scala
@@ -0,0 +1,49 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+package akka.instrumentation
+
+import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect }
+import akka.event.Logging.Warning
+import scala.compat.Platform.EOL
+import akka.actor.ActorRefProvider
+import akka.pattern.{ AskTimeoutException, PromiseActorRef }
+
+@Aspect
+class AskPatternTracing {
+
+ class StackTraceCaptureException extends Throwable
+
+ @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *)", argNames = "provider")
+ def promiseActorRefApply(provider: ActorRefProvider): Unit = {
+ provider.settings.config.getBoolean("kamon.trace.ask-pattern-tracing")
+ }
+
+ @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor")
+ def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = {
+ val future = promiseActor.result.future
+ val system = promiseActor.provider.guardian.underlying.system
+ implicit val ec = system.dispatcher
+ val stack = new StackTraceCaptureException
+
+ future onFailure {
+ case timeout: AskTimeoutException ⇒
+ val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
+
+ system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing],
+ "Timeout triggered for ask pattern registered at: " + stackString))
+ }
+ }
+}