From ff7ac0ec79dd61849b4c76b10f74af72c7cceea9 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 28 Dec 2014 07:59:59 +0100 Subject: ! core,akka: separate all akka instrumentation into it's own kamon-akka module, related to #136. All Akka-related instrumentation and code has been moved to the kamon-akka module, including the filters for actor, dispatcher and router metrics. Also the following changes are included: - Router Metrics are now working properly, related to #139. - Cleanup the log output for this module, related to #142. - Some minor cleanups in various tests. This PR breaks the reporting modules which will need to wait for #141 to be ready to come back to life. --- .../protobuf/TraceContextAwareWireFormats.proto | 31 ---- kamon-core/src/main/protobuf/WireFormats.proto | 132 ---------------- kamon-core/src/main/resources/META-INF/aop.xml | 21 --- kamon-core/src/main/resources/reference.conf | 44 ------ .../src/main/scala/kamon/akka/AkkaExtension.scala | 32 ---- .../akka/ActorCellInstrumentation.scala | 176 --------------------- .../akka/ActorLoggingInstrumentation.scala | 50 ------ .../akka/ActorSystemMessageInstrumentation.scala | 80 ---------- .../akka/AskPatternInstrumentation.scala | 88 ----------- .../akka/DispatcherInstrumentation.scala | 163 ------------------- .../src/main/scala/kamon/metric/ActorMetrics.scala | 93 ----------- .../scala/kamon/metric/DispatcherMetrics.scala | 93 ----------- .../main/scala/kamon/metric/RouterMetrics.scala | 82 ---------- 13 files changed, 1085 deletions(-) delete mode 100644 kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto delete mode 100644 kamon-core/src/main/protobuf/WireFormats.proto delete mode 100644 kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto b/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto deleted file mode 100644 index d4ee21b5..00000000 --- a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto +++ /dev/null @@ -1,31 +0,0 @@ -import "WireFormats.proto"; - -option java_package = "akka.remote.instrumentation"; -option optimize_for = SPEED; - - -/************************************************ - * Kamon-specific additions to the protocol - ************************************************/ - -message AckAndTraceContextAwareEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional TraceContextAwareRemoteEnvelope envelope = 2; -} - -message TraceContextAwareRemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; - - optional RemoteTraceContext traceContext = 15; -} - -message RemoteTraceContext { - required string traceName = 1; - required string traceToken = 2; - required bool isOpen = 3; - required fixed64 startMilliTime = 4; -} - diff --git a/kamon-core/src/main/protobuf/WireFormats.proto b/kamon-core/src/main/protobuf/WireFormats.proto deleted file mode 100644 index 98a954cc..00000000 --- a/kamon-core/src/main/protobuf/WireFormats.proto +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -// Extracted from https://github.com/akka/akka/blob/master/akka-remote/src/main/protobuf/WireFormats.proto - - -option java_package = "akka.remote"; -option optimize_for = SPEED; - -/****************************************** - * Remoting message formats - ******************************************/ - - -message AckAndEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional RemoteEnvelope envelope = 2; -} - -/** - * Defines a remote message. - */ -message RemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; -} - -message AcknowledgementInfo { - required fixed64 cumulativeAck = 1; - repeated fixed64 nacks = 2; -} - -/** - * Defines a remote ActorRef that "remembers" and uses its original Actor instance - * on the original node. - */ -message ActorRefData { - required string path = 1; -} - -/** - * Defines a message. - */ -message SerializedMessage { - required bytes message = 1; - required int32 serializerId = 2; - optional bytes messageManifest = 3; -} - -/** - * Defines akka.remote.DaemonMsgCreate - */ -message DaemonMsgCreateData { - required PropsData props = 1; - required DeployData deploy = 2; - required string path = 3; - required ActorRefData supervisor = 4; -} - -/** - * Serialization of akka.actor.Props - */ -message PropsData { - required DeployData deploy = 2; - required string clazz = 3; - repeated bytes args = 4; - repeated string classes = 5; -} - -/** - * Serialization of akka.actor.Deploy - */ -message DeployData { - required string path = 1; - optional bytes config = 2; - optional bytes routerConfig = 3; - optional bytes scope = 4; - optional string dispatcher = 5; -} - - -/****************************************** - * Akka Protocol message formats - ******************************************/ - -/** - * Message format of Akka Protocol. - * Message contains either a payload or an instruction. - */ -message AkkaProtocolMessage { - optional bytes payload = 1; - optional AkkaControlMessage instruction = 2; -} - -/** - * Defines some control messages for the remoting - */ -message AkkaControlMessage { - required CommandType commandType = 1; - optional AkkaHandshakeInfo handshakeInfo = 2; -} - -message AkkaHandshakeInfo { - required AddressData origin = 1; - required fixed64 uid = 2; - optional string cookie = 3; - -} - -/** - * Defines the type of the AkkaControlMessage command type - */ -enum CommandType { - ASSOCIATE = 1; - DISASSOCIATE = 2; - HEARTBEAT = 3; - DISASSOCIATE_SHUTTING_DOWN = 4; // Remote system is going down and will not accepts new connections - DISASSOCIATE_QUARANTINED = 5; // Remote system refused the association since the current system is quarantined -} - -/** - * Defines a remote address. - */ -message AddressData { - required string system = 1; - required string hostname = 2; - required uint32 port = 3; - optional string protocol = 4; -} diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 8c7eb235..47ce11d8 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -5,38 +5,17 @@ - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 2d8b3f2e..8f5a8b45 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -87,41 +87,10 @@ kamon { significant-value-digits = 2 } - - actor { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} - } - - router { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - } - trace { elapsed-time = ${kamon.metrics.precision.default-histogram-precision} segment = ${kamon.metrics.precision.default-histogram-precision} } - - dispatcher { - maximum-pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - running-thread-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - queued-task-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - } } } @@ -176,19 +145,6 @@ kamon { dispatcher = ${kamon.default-dispatcher} } - akka { - # If ask-pattern-timeout-warning is enabled, a WARN level log message will be generated if a future generated by the `ask` - # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. - # strategies: - # - off: nothing to do. - # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. - # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. - ask-pattern-timeout-warning = off - - # Default dispatcher for all akka module operations - dispatcher = ${kamon.default-dispatcher} - } - kamon-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher diff --git a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala deleted file mode 100644 index 2fc7395e..00000000 --- a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -// -package kamon.extension.akka - -import akka.actor -import akka.actor._ -import kamon._ - -class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.akka") - val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") - val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) -} - -object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Akka - def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala deleted file mode 100644 index a340566d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.RouterMetrics.RouterMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - import ActorCellInstrumentation.PimpedActorCellMetrics - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") - routedActorCell.routerMetricIdentity = routerMetricIdentity - routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - - (processingTime, timeInMailbox) - } map { - case (processingTime, timeInMailbox) ⇒ - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { rm ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errors.increment() - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ rm.errors.increment() - } - } - } - -} - -trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var routerMetricIdentity: RouterMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ -} - -@Aspect -class ActorCellMetricsIntoActorCellMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -object ActorCellInstrumentation { - implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { - def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { - case routedActorCell: RoutedActorCell ⇒ block(cell) - case everythingElse ⇒ - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala deleted file mode 100644 index e0e5d316..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorLoggingInstrumentation extends MdcKeysSupport { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { - withMdc { - pjp.proceed() - } - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala deleted file mode 100644 index 48016876..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorSystemMessageInstrumentation { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} - -@Aspect -class TraceContextIntoSystemMessageMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class TraceContextIntoRepointableActorRefMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { - pjp.proceed() - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala deleted file mode 100644 index 5b4fbbc8..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.Kamon -import kamon.extension.akka.Akka -import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } -import akka.actor.{ ActorSystem, ActorRef } -import akka.event.Logging.Warning -import akka.pattern.AskTimeoutException -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ -import org.aspectj.lang.reflect.SourceLocation -import scala.concurrent.Future -import scala.compat.Platform.EOL - -@Aspect -class AskPatternInstrumentation { - - import AskPatternInstrumentation._ - - @DeclareMixin("akka.pattern.AskableActorRef$") - def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(ctx) && args(actor, *, *)") - def askableActorRefAsk(ctx: TraceContextAware, actor: ActorRef): Unit = {} - - @Around("askableActorRefAsk(ctx, actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, ctx: TraceContextAware, actor: ActorRef): AnyRef = ctx.traceContext match { - case EmptyTraceContext ⇒ pjp.proceed() - case ctx: TraceContext ⇒ - implicit val system = ctx.system - val akkaExtension = Kamon(Akka)(system) - - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))) - } - - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future - } - - def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { - case e: AskTimeoutException ⇒ - val message = { - if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) - else callInfo.map(_.message) - } - publish(message) - } - - def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: $msg")) - } -} - -object AskPatternInstrumentation { - type ErrorHandler = PartialFunction[Throwable, Unit] - - class StackTraceCaptureException extends Throwable - - case class CallInfo(name: String, sourceLocation: SourceLocation) { - def message: String = { - def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("") - def line: String = s"$name @ $locationInfo" - s"$line" - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 4b1bbc4d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.{ DispatcherMetrics, Metrics } -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala deleted file mode 100644 index d2cb4e38..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory -} - -case object ActorMetricGroupFactory extends MetricGroupFactory { - import ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala deleted file mode 100644 index 126f6333..00000000 --- a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Histogram, HdrHistogram } - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { - - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) - - def cleanup: Unit = {} - - } - - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = DispatcherMetricSnapshot - - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = DispatcherMetricGroupFactory -} - -case object DispatcherMetricGroupFactory extends MetricGroupFactory { - - import DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) - } - -} diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala deleted file mode 100644 index ddfef416..00000000 --- a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Counter, Histogram } - -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } -} - -- cgit v1.2.3