From 20e0193071b483b68c53b55bd5502d05f0631ec6 Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 27 Dec 2014 17:09:38 -0300 Subject: + system-metrics: fix #135 and includes the following new metrics: ** DiskMetrics ** NonHeapMetrics ** LoadAverageMetrics ** ThreadMetrics ** ClassLoadingMetrics and closes #131 --- kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala index 47ef4701..83992e61 100644 --- a/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala +++ b/kamon-core/src/test/scala/kamon/util/GlobPathFilterSpec.scala @@ -16,7 +16,7 @@ package kamon.util -import org.scalatest.{Matchers, WordSpecLike} +import org.scalatest.{ Matchers, WordSpecLike } class GlobPathFilterSpec extends WordSpecLike with Matchers { "The GlobPathFilter" should { @@ -41,18 +41,18 @@ class GlobPathFilterSpec extends WordSpecLike with Matchers { } "match all expressions and crosses the path boundaries" in { - val filter = new GlobPathFilter("/user/actor-**") + val filter = new GlobPathFilter("/user/actor-**") filter.accept("/user/actor-") shouldBe true filter.accept("/user/actor-one") shouldBe true filter.accept("/user/actor-one/other") shouldBe true filter.accept("/user/something/actor") shouldBe false - filter.accept("/user/something/otherActor")shouldBe false + filter.accept("/user/something/otherActor") shouldBe false } "match exactly one characterr" in { - val filter = new GlobPathFilter("/user/actor-?") + val filter = new GlobPathFilter("/user/actor-?") filter.accept("/user/actor-1") shouldBe true filter.accept("/user/actor-2") shouldBe true -- cgit v1.2.3 From ff7ac0ec79dd61849b4c76b10f74af72c7cceea9 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 28 Dec 2014 07:59:59 +0100 Subject: ! core,akka: separate all akka instrumentation into it's own kamon-akka module, related to #136. All Akka-related instrumentation and code has been moved to the kamon-akka module, including the filters for actor, dispatcher and router metrics. Also the following changes are included: - Router Metrics are now working properly, related to #139. - Cleanup the log output for this module, related to #142. - Some minor cleanups in various tests. This PR breaks the reporting modules which will need to wait for #141 to be ready to come back to life. --- .../protobuf/TraceContextAwareWireFormats.proto | 31 --- kamon-core/src/main/protobuf/WireFormats.proto | 132 ------------ kamon-core/src/main/resources/META-INF/aop.xml | 21 -- kamon-core/src/main/resources/reference.conf | 44 ---- .../src/main/scala/kamon/akka/AkkaExtension.scala | 32 --- .../akka/ActorCellInstrumentation.scala | 176 ---------------- .../akka/ActorLoggingInstrumentation.scala | 50 ----- .../akka/ActorSystemMessageInstrumentation.scala | 80 -------- .../akka/AskPatternInstrumentation.scala | 88 -------- .../akka/DispatcherInstrumentation.scala | 163 --------------- .../src/main/scala/kamon/metric/ActorMetrics.scala | 93 --------- .../scala/kamon/metric/DispatcherMetrics.scala | 93 --------- .../main/scala/kamon/metric/RouterMetrics.scala | 82 -------- .../akka/ActorCellInstrumentationSpec.scala | 87 -------- .../akka/ActorLoggingInstrumentationSpec.scala | 73 ------- .../ActorSystemMessageInstrumentationSpec.scala | 172 ---------------- .../akka/AskPatternInstrumentationSpec.scala | 66 ------ .../test/scala/kamon/metric/ActorMetricsSpec.scala | 224 --------------------- .../scala/kamon/metric/DispatcherMetricsSpec.scala | 108 ---------- .../scala/kamon/metric/RouterMetricsSpec.scala | 161 --------------- 20 files changed, 1976 deletions(-) delete mode 100644 kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto delete mode 100644 kamon-core/src/main/protobuf/WireFormats.proto delete mode 100644 kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto b/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto deleted file mode 100644 index d4ee21b5..00000000 --- a/kamon-core/src/main/protobuf/TraceContextAwareWireFormats.proto +++ /dev/null @@ -1,31 +0,0 @@ -import "WireFormats.proto"; - -option java_package = "akka.remote.instrumentation"; -option optimize_for = SPEED; - - -/************************************************ - * Kamon-specific additions to the protocol - ************************************************/ - -message AckAndTraceContextAwareEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional TraceContextAwareRemoteEnvelope envelope = 2; -} - -message TraceContextAwareRemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; - - optional RemoteTraceContext traceContext = 15; -} - -message RemoteTraceContext { - required string traceName = 1; - required string traceToken = 2; - required bool isOpen = 3; - required fixed64 startMilliTime = 4; -} - diff --git a/kamon-core/src/main/protobuf/WireFormats.proto b/kamon-core/src/main/protobuf/WireFormats.proto deleted file mode 100644 index 98a954cc..00000000 --- a/kamon-core/src/main/protobuf/WireFormats.proto +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -// Extracted from https://github.com/akka/akka/blob/master/akka-remote/src/main/protobuf/WireFormats.proto - - -option java_package = "akka.remote"; -option optimize_for = SPEED; - -/****************************************** - * Remoting message formats - ******************************************/ - - -message AckAndEnvelopeContainer { - optional AcknowledgementInfo ack = 1; - optional RemoteEnvelope envelope = 2; -} - -/** - * Defines a remote message. - */ -message RemoteEnvelope { - required ActorRefData recipient = 1; - required SerializedMessage message = 2; - optional ActorRefData sender = 4; - optional fixed64 seq = 5; -} - -message AcknowledgementInfo { - required fixed64 cumulativeAck = 1; - repeated fixed64 nacks = 2; -} - -/** - * Defines a remote ActorRef that "remembers" and uses its original Actor instance - * on the original node. - */ -message ActorRefData { - required string path = 1; -} - -/** - * Defines a message. - */ -message SerializedMessage { - required bytes message = 1; - required int32 serializerId = 2; - optional bytes messageManifest = 3; -} - -/** - * Defines akka.remote.DaemonMsgCreate - */ -message DaemonMsgCreateData { - required PropsData props = 1; - required DeployData deploy = 2; - required string path = 3; - required ActorRefData supervisor = 4; -} - -/** - * Serialization of akka.actor.Props - */ -message PropsData { - required DeployData deploy = 2; - required string clazz = 3; - repeated bytes args = 4; - repeated string classes = 5; -} - -/** - * Serialization of akka.actor.Deploy - */ -message DeployData { - required string path = 1; - optional bytes config = 2; - optional bytes routerConfig = 3; - optional bytes scope = 4; - optional string dispatcher = 5; -} - - -/****************************************** - * Akka Protocol message formats - ******************************************/ - -/** - * Message format of Akka Protocol. - * Message contains either a payload or an instruction. - */ -message AkkaProtocolMessage { - optional bytes payload = 1; - optional AkkaControlMessage instruction = 2; -} - -/** - * Defines some control messages for the remoting - */ -message AkkaControlMessage { - required CommandType commandType = 1; - optional AkkaHandshakeInfo handshakeInfo = 2; -} - -message AkkaHandshakeInfo { - required AddressData origin = 1; - required fixed64 uid = 2; - optional string cookie = 3; - -} - -/** - * Defines the type of the AkkaControlMessage command type - */ -enum CommandType { - ASSOCIATE = 1; - DISASSOCIATE = 2; - HEARTBEAT = 3; - DISASSOCIATE_SHUTTING_DOWN = 4; // Remote system is going down and will not accepts new connections - DISASSOCIATE_QUARANTINED = 5; // Remote system refused the association since the current system is quarantined -} - -/** - * Defines a remote address. - */ -message AddressData { - required string system = 1; - required string hostname = 2; - required uint32 port = 3; - optional string protocol = 4; -} diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 8c7eb235..47ce11d8 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -5,38 +5,17 @@ - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 2d8b3f2e..8f5a8b45 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -87,41 +87,10 @@ kamon { significant-value-digits = 2 } - - actor { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} - } - - router { - processing-time = ${kamon.metrics.precision.default-histogram-precision} - time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} - } - trace { elapsed-time = ${kamon.metrics.precision.default-histogram-precision} segment = ${kamon.metrics.precision.default-histogram-precision} } - - dispatcher { - maximum-pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - running-thread-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - queued-task-count { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - pool-size { - highest-trackable-value = 999999999 - significant-value-digits = 2 - } - } } } @@ -176,19 +145,6 @@ kamon { dispatcher = ${kamon.default-dispatcher} } - akka { - # If ask-pattern-timeout-warning is enabled, a WARN level log message will be generated if a future generated by the `ask` - # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. - # strategies: - # - off: nothing to do. - # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. - # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. - ask-pattern-timeout-warning = off - - # Default dispatcher for all akka module operations - dispatcher = ${kamon.default-dispatcher} - } - kamon-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher diff --git a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala deleted file mode 100644 index 2fc7395e..00000000 --- a/kamon-core/src/main/scala/kamon/akka/AkkaExtension.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -// -package kamon.extension.akka - -import akka.actor -import akka.actor._ -import kamon._ - -class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.akka") - val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") - val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) -} - -object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Akka - def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala deleted file mode 100644 index a340566d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.RouterMetrics.RouterMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - import ActorCellInstrumentation.PimpedActorCellMetrics - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - - val metricsExtension = Kamon(Metrics)(system) - val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricIdentity = metricIdentity - cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") - routedActorCell.routerMetricIdentity = routerMetricIdentity - routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) - } - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - - (processingTime, timeInMailbox) - } map { - case (processingTime, timeInMailbox) ⇒ - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") - def sendingMessageToActorCell(cell: ActorCell): Unit = {} - - @After("sendingMessageToActorCell(cell)") - def afterSendMessageToActorCell(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { rm ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ am.errors.increment() - } - - cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ - routedActorCell.routerMetricsRecorder.map { - rm ⇒ rm.errors.increment() - } - } - } - -} - -trait ActorCellMetrics { - var actorMetricIdentity: ActorMetrics = _ - var routerMetricIdentity: RouterMetrics = _ - var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ - var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ -} - -@Aspect -class ActorCellMetricsIntoActorCellMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -object ActorCellInstrumentation { - implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { - def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { - case routedActorCell: RoutedActorCell ⇒ block(cell) - case everythingElse ⇒ - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala deleted file mode 100644 index e0e5d316..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorLoggingInstrumentation extends MdcKeysSupport { - - @DeclareMixin("akka.event.Logging.LogEvent+") - def mixinTraceContextAwareToLogEvent: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.event.Logging.LogEvent+.new(..)) && this(event)") - def logEventCreation(event: TraceContextAware): Unit = {} - - @After("logEventCreation(event)") - def captureTraceContext(event: TraceContextAware): Unit = { - // Force initialization of TraceContextAware - event.traceContext - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { - withMdc { - pjp.proceed() - } - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala deleted file mode 100644 index 48016876..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorSystemMessageInstrumentation { - - @Pointcut("execution(* akka.actor.ActorCell.invokeAll$1(..)) && args(messages, *)") - def systemMessageProcessing(messages: EarliestFirstSystemMessageList): Unit = {} - - @Around("systemMessageProcessing(messages)") - def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { - if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) - - } else pjp.proceed() - } -} - -@Aspect -class TraceContextIntoSystemMessageMixin { - - @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixinTraceContextAwareToSystemMessage: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } -} - -@Aspect -class TraceContextIntoRepointableActorRefMixin { - - @DeclareMixin("akka.actor.RepointableActorRef") - def mixinTraceContextAwareToRepointableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: TraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - } - - @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} - - @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { - pjp.proceed() - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala deleted file mode 100644 index 5b4fbbc8..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import kamon.Kamon -import kamon.extension.akka.Akka -import kamon.trace.{ TraceContext, EmptyTraceContext, TraceContextAware } -import akka.actor.{ ActorSystem, ActorRef } -import akka.event.Logging.Warning -import akka.pattern.AskTimeoutException -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ -import org.aspectj.lang.reflect.SourceLocation -import scala.concurrent.Future -import scala.compat.Platform.EOL - -@Aspect -class AskPatternInstrumentation { - - import AskPatternInstrumentation._ - - @DeclareMixin("akka.pattern.AskableActorRef$") - def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default - - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(ctx) && args(actor, *, *)") - def askableActorRefAsk(ctx: TraceContextAware, actor: ActorRef): Unit = {} - - @Around("askableActorRefAsk(ctx, actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, ctx: TraceContextAware, actor: ActorRef): AnyRef = ctx.traceContext match { - case EmptyTraceContext ⇒ pjp.proceed() - case ctx: TraceContext ⇒ - implicit val system = ctx.system - val akkaExtension = Kamon(Akka)(system) - - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))) - } - - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future - } - - def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { - case e: AskTimeoutException ⇒ - val message = { - if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) - else callInfo.map(_.message) - } - publish(message) - } - - def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: $msg")) - } -} - -object AskPatternInstrumentation { - type ErrorHandler = PartialFunction[Throwable, Unit] - - class StackTraceCaptureException extends Throwable - - case class CallInfo(name: String, sourceLocation: SourceLocation) { - def message: String = { - def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("") - def line: String = s"$name @ $locationInfo" - s"$line" - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala deleted file mode 100644 index 4b1bbc4d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/DispatcherInstrumentation.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import java.lang.reflect.Method -import java.util.concurrent.ThreadPoolExecutor - -import akka.actor.{ ActorSystemImpl, Cancellable } -import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } -import akka.kamon.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement -import kamon.Kamon -import kamon.metric.DispatcherMetrics.DispatcherMetricRecorder -import kamon.metric.{ DispatcherMetrics, Metrics } -import org.aspectj.lang.annotation._ - -import scala.concurrent.forkjoin.ForkJoinPool - -@Aspect -class DispatcherInstrumentation { - - @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") - def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - - @Before("onActorSystemStartup(dispatchers, system)") - def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { - val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] - currentDispatchers.actorSystem = system - } - - @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") - def onDispatchersLookup(dispatchers: Dispatchers) = {} - - @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") - def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { - val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem - } - - @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") - def onCreateExecutorService(): Unit = {} - - @Pointcut("cflow((execution(* akka.dispatch.MessageDispatcher.registerForExecution(..)) || execution(* akka.dispatch.MessageDispatcher.executeTask(..))) && this(dispatcher))") - def onCflowMessageDispatcher(dispatcher: Dispatcher): Unit = {} - - @Pointcut("onCreateExecutorService() && onCflowMessageDispatcher(dispatcher)") - def onDispatcherStartup(dispatcher: Dispatcher): Unit = {} - - @After("onDispatcherStartup(dispatcher)") - def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) - - dispatcherWithMetrics.metricIdentity = metricIdentity - dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) - - if (dispatcherWithMetrics.dispatcherMetricsRecorder.isDefined) { - dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dm ⇒ - val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = - DispatcherMetricsCollector.collect(dispatcher) - - dm.maximumPoolSize.record(maximumPoolSize) - dm.runningThreadCount.record(runningThreadCount) - dm.queueTaskCount.record(queueTaskCount) - dm.poolSize.record(poolSize) - } - } - } - } - - @Pointcut("execution(* akka.dispatch.MessageDispatcher.shutdown(..)) && this(dispatcher)") - def onDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {} - - @After("onDispatcherShutdown(dispatcher)") - def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { - val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMetricCollectionInfo] - - dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher ⇒ - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) - } - } -} - -@Aspect -class DispatcherMetricCollectionInfoIntoDispatcherMixin { - - @DeclareMixin("akka.dispatch.MessageDispatcher") - def mixinDispatcherMetricsToMessageDispatcher: DispatcherMetricCollectionInfo = new DispatcherMetricCollectionInfo {} - - @DeclareMixin("akka.dispatch.Dispatchers") - def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} -} - -trait DispatcherMetricCollectionInfo { - var metricIdentity: DispatcherMetrics = _ - var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ - var dispatcherCollectorCancellable: Cancellable = _ - var actorSystem: ActorSystemImpl = _ -} - -trait DispatchersWithActorSystem { - var actorSystem: ActorSystemImpl = _ -} - -object DispatcherMetricsCollector { - - case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) - - private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, - (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) - } - - private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { - DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) - } - - private val executorServiceMethod: Method = { - // executorService is protected - val method = classOf[Dispatcher].getDeclaredMethod("executorService") - method.setAccessible(true) - method - } - - def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { - dispatcher match { - case x: Dispatcher ⇒ { - val executor = executorServiceMethod.invoke(x) match { - case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other - } - - executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) - case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } - case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala deleted file mode 100644 index d2cb4e38..00000000 --- a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } - -case class ActorMetrics(name: String) extends MetricGroupIdentity { - val category = ActorMetrics -} - -object ActorMetrics extends MetricGroupCategory { - val name = "actor" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, - errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.collect(context), - timeInMailbox.collect(context), - mailboxSize.collect(context), - errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - mailboxSize.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, - mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = ActorMetricSnapshot - - def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = - ActorMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - mailboxSize.merge(that.mailboxSize, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (ProcessingTime -> processingTime), - (MailboxSize -> mailboxSize), - (TimeInMailbox -> timeInMailbox), - (Errors -> errors)) - } - - val Factory = ActorMetricGroupFactory -} - -case object ActorMetricGroupFactory extends MetricGroupFactory { - import ActorMetrics._ - - type GroupRecorder = ActorMetricsRecorder - - def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { - val settings = config.getConfig("precision.actor") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - val mailboxSizeConfig = settings.getConfig("mailbox-size") - - new ActorMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - MinMaxCounter.fromConfig(mailboxSizeConfig, system), - Counter()) - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala deleted file mode 100644 index 126f6333..00000000 --- a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Histogram, HdrHistogram } - -case class DispatcherMetrics(name: String) extends MetricGroupIdentity { - val category = DispatcherMetrics -} - -object DispatcherMetrics extends MetricGroupCategory { - val name = "dispatcher" - - case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } - case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } - case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } - case object PoolSize extends MetricIdentity { val name = "pool-size" } - - case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, - queueTaskCount: Histogram, poolSize: Histogram) - extends MetricGroupRecorder { - - def collect(context: CollectionContext): MetricGroupSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.collect(context), - runningThreadCount.collect(context), - queueTaskCount.collect(context), - poolSize.collect(context)) - - def cleanup: Unit = {} - - } - - case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, - queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = DispatcherMetricSnapshot - - def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = - DispatcherMetricSnapshot( - maximumPoolSize.merge(that.maximumPoolSize, context), - runningThreadCount.merge(that.runningThreadCount, context), - queueTaskCount.merge(that.queueTaskCount, context), - poolSize.merge(that.poolSize, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - (MaximumPoolSize -> maximumPoolSize), - (RunningThreadCount -> runningThreadCount), - (QueueTaskCount -> queueTaskCount), - (PoolSize -> poolSize)) - } - - val Factory = DispatcherMetricGroupFactory -} - -case object DispatcherMetricGroupFactory extends MetricGroupFactory { - - import DispatcherMetrics._ - - type GroupRecorder = DispatcherMetricRecorder - - def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { - val settings = config.getConfig("precision.dispatcher") - - val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") - val runningThreadCountConfig = settings.getConfig("running-thread-count") - val queueTaskCountConfig = settings.getConfig("queued-task-count") - val poolSizeConfig = settings.getConfig("pool-size") - - new DispatcherMetricRecorder( - Histogram.fromConfig(maximumPoolSizeConfig), - Histogram.fromConfig(runningThreadCountConfig), - Histogram.fromConfig(queueTaskCountConfig), - Histogram.fromConfig(poolSizeConfig)) - } - -} diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala deleted file mode 100644 index ddfef416..00000000 --- a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.metric - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.instrument.{ Counter, Histogram } - -case class RouterMetrics(name: String) extends MetricGroupIdentity { - val category = RouterMetrics -} - -object RouterMetrics extends MetricGroupCategory { - val name = "router" - - case object ProcessingTime extends MetricIdentity { val name = "processing-time" } - case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { - - def collect(context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) - - def cleanup: Unit = { - processingTime.cleanup - timeInMailbox.cleanup - errors.cleanup - } - } - - case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { - - type GroupSnapshotType = RouterMetricSnapshot - - def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = - RouterMetricSnapshot( - processingTime.merge(that.processingTime, context), - timeInMailbox.merge(that.timeInMailbox, context), - errors.merge(that.errors, context)) - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - ProcessingTime -> processingTime, - TimeInMailbox -> timeInMailbox, - Errors -> errors) - } - - val Factory = RouterMetricGroupFactory -} - -case object RouterMetricGroupFactory extends MetricGroupFactory { - - import RouterMetrics._ - - type GroupRecorder = RouterMetricsRecorder - - def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { - val settings = config.getConfig("precision.router") - - val processingTimeConfig = settings.getConfig("processing-time") - val timeInMailboxConfig = settings.getConfig("time-in-mailbox") - - new RouterMetricsRecorder( - Histogram.fromConfig(processingTimeConfig), - Histogram.fromConfig(timeInMailboxConfig), - Counter()) - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala deleted file mode 100644 index 0f682500..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorSystem, Props } -import akka.pattern.{ ask, pipe } -import akka.routing.RoundRobinPool -import akka.testkit.{ ImplicitSender, TestKit } -import akka.util.Timeout -import kamon.trace.TraceRecorder -import org.scalatest.WordSpecLike - -import scala.concurrent.duration._ - -class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instrumentation-spec")) with WordSpecLike - with ImplicitSender { - - implicit val executionContext = system.dispatcher - - "the message passing instrumentation" should { - "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { - ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using ask" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { - // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. - (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - } - - trait EchoActorFixture { - val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) - } - - trait RoutedEchoActorFixture extends EchoActorFixture { - override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) - } -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala deleted file mode 100644 index 4b114d4f..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } -import akka.event.Logging.LogEvent -import akka.testkit.TestKit -import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceLocal, TraceContextAware, TraceRecorder } -import org.scalatest.{ Inspectors, Matchers, WordSpecLike } -import org.slf4j.MDC - -class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging-instrumentation-spec")) with WordSpecLike - with Matchers with Inspectors with MdcKeysSupport { - - "the ActorLogging instrumentation" should { - "attach the TraceContext (if available) to log events" in { - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { - loggerActor ! "info" - TraceRecorder.currentContext - } - - fishForMessage() { - case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ - val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext - ctxInEvent === testTraceContext - - case event: LogEvent ⇒ false - } - } - - "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { - val testString = "Hello World" - val SampleTraceLocalKeyAvailableToMDC = AvailableToMdc("some-cool-key") - - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - TraceRecorder.withNewTraceContext("logging-with-mdc") { - TraceLocal.store(SampleTraceLocalKeyAvailableToMDC)(testString) - - loggerActor ! "info" - - withMdc { - MDC.get("some-cool-key") should equal(testString) - } - } - } - } -} - -class LoggerActor extends Actor with ActorLogging { - def receive = { - case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala deleted file mode 100644 index d79ccbe0..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ /dev/null @@ -1,172 +0,0 @@ -package kamon.instrumentation.akka - -import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } -import akka.actor._ -import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.{ EmptyTraceContext, TraceRecorder } -import org.scalatest.WordSpecLike - -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec")) - with WordSpecLike with ImplicitSender { - - implicit val executionContext = system.dispatcher - - "the system message passing instrumentation" should { - "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { - system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { - system.actorOf(Props(new Actor { - def receive: Actor.Receive = { - case any ⇒ - context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - } - })) ! "any" - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext in the supervision cycle" when { - "the actor is resumed" in { - val supervisor = supervisorWithDirective(Resume) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(EmptyTraceContext) - } - - "the actor is restarted" in { - val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the preRestart hook - expectMsg(testTraceContext) // From the postRestart hook - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(EmptyTraceContext) - } - - "the actor is stopped" in { - val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook - expectNoMsg(1 second) - } - - "the failure is escalated" in { - val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the grandparent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook in the child - expectMsg(testTraceContext) // From the postStop hook in the parent - expectNoMsg(1 second) - } - } - } - - def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, - sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { - class GrandParent extends Actor { - val child = context.actorOf(Props(new Parent)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop - } - - def receive = { - case any ⇒ child forward any - } - } - - class Parent extends Actor { - val child = context.actorOf(Props(new Child)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive - } - - def receive: Actor.Receive = { - case any ⇒ child forward any - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - } - - class Child extends Actor { - def receive = { - case "fail" ⇒ throw new ArithmeticException("Division by zero.") - case "context" ⇒ sender ! TraceRecorder.currentContext - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext - super.preRestart(reason, message) - } - - override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext - super.postRestart(reason) - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - - override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext - super.preStart() - } - } - - system.actorOf(Props(new GrandParent)) - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala deleted file mode 100644 index 471cbd4d..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorSystem, Props } -import akka.event.Logging.Warning -import akka.pattern.ask -import akka.testkit.TestKitBase -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( - """ - |kamon { - | akka { - | ask-pattern-timeout-warning = heavyweight - | } - |} - """.stripMargin)) - - "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { - implicit val ec = system.dispatcher - implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply], "NoReply") - system.eventStream.subscribe(testActor, classOf[Warning]) - - val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { - noReply ? "hello" - TraceRecorder.currentContext - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - - capturedCtx should equal(testTraceContext) - } - } -} - -class NoReply extends Actor { - def receive = { - case any ⇒ - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala deleted file mode 100644 index 97bcb0cf..00000000 --- a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.LongBuffer - -import akka.kamon.instrumentation.ActorCellMetrics -import kamon.Kamon -import kamon.metric.ActorMetricsTestActor._ -import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } -import akka.actor._ -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } - -class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision.actor { - | processing-time { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | time-in-mailbox { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | mailbox-size { - | refresh-interval = 1 hour - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) - - "the Kamon actor metrics" should { - "respect the configured include and exclude filters" in new ActorMetricsFixtures { - val trackedActor = createTestActor("tracked-actor") - actorMetricsRecorderOf(trackedActor) should not be empty - - val nonTrackedActor = createTestActor("non-tracked-actor") - actorMetricsRecorderOf(nonTrackedActor) shouldBe empty - - val trackedButExplicitlyExcluded = createTestActor("tracked-explicitly-excluded") - actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty - } - - "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures { - val trackedActor = createTestActor("clean-after-collect") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - for (i ← 1 to 100) { - trackedActor ! Discard - } - trackedActor ! Fail - trackedActor ! TrackTimings(sleep = Some(1 second)) - expectMsgType[TrackedTimings] - - val firstSnapshot = takeSnapshotOf(trackedActorMetrics) - firstSnapshot.errors.count should be(1L) - firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L - firstSnapshot.processingTime.numberOfMeasurements should be(103L) // 102 examples + Initialize message - firstSnapshot.timeInMailbox.numberOfMeasurements should be(103L) // 102 examples + Initialize message - - val secondSnapshot = takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - secondSnapshot.errors.count should be(0L) - secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current - secondSnapshot.processingTime.numberOfMeasurements should be(0L) - secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) - } - - "record the processing-time of the receive function" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-processing-time") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.processingTime.numberOfMeasurements should be(1L) - snapshot.processingTime.recordsIterator.next().count should be(1L) - snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) - } - - "record the number of errors" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-errors") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - for (i ← 1 to 10) { trackedActor ! Fail } - trackedActor ! Ping - expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.errors.count should be(10) - } - - "record the mailbox-size" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-mailbox-size") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - for (i ← 1 to 10) { - trackedActor ! Discard - } - trackedActor ! Ping - - val timings = expectMsgType[TrackedTimings] - expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.mailboxSize.min should be(0L) - snapshot.mailboxSize.max should be(11L +- 1L) - } - - "record the time-in-mailbox" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-time-in-mailbox") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.timeInMailbox.numberOfMeasurements should be(1L) - snapshot.timeInMailbox.recordsIterator.next().count should be(1L) - snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) - } - - "clean up the associated recorder when the actor is stopped" in new ActorMetricsFixtures { - val trackedActor = createTestActor("stop") - actorMetricsRecorderOf(trackedActor).get // force the actor to be initialized - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) should not be empty - - val deathWatcher = TestProbe() - deathWatcher.watch(trackedActor) - trackedActor ! PoisonPill - deathWatcher.expectTerminated(trackedActor) - - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) shouldBe empty - } - } - - trait ActorMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = { - val initialisationListener = TestProbe() - ref.tell(Ping, initialisationListener.ref) - initialisationListener.expectMsg(Pong) - - val underlyingCellField = ref.getClass.getDeclaredMethod("underlying") - val cell = underlyingCellField.invoke(ref).asInstanceOf[ActorCellMetrics] - - cell.actorMetricsRecorder - } - - def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) - - def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) - } -} - -class ActorMetricsTestActor extends Actor { - def receive = { - case Discard ⇒ - case Fail ⇒ throw new ArithmeticException("Division by zero.") - case Ping ⇒ sender ! Pong - case TrackTimings(sendTimestamp, sleep) ⇒ { - val dequeueTimestamp = System.nanoTime() - sleep.map(s ⇒ Thread.sleep(s.toMillis)) - val afterReceiveTimestamp = System.nanoTime() - - sender ! TrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) - } - } -} - -object ActorMetricsTestActor { - case object Ping - case object Pong - case object Fail - case object Discard - - case class TrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) - case class TrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { - def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp - def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala deleted file mode 100644 index ae324b73..00000000 --- a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ TestProbe, TestKitBase } -import akka.actor.{ ActorRef, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.DispatcherMetrics.DispatcherMetricSnapshot - -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 second - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } - | } - | ] - |} - | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" - |} - | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" - |} - | - """.stripMargin)) - - "the Kamon dispatcher metrics" should { - "respect the configured include and exclude filters" in { - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } - } - - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") - - for (_ ← 1 to 100) { - //delayable ! Discard - } - - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) - } - - } - - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] - } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] - } - - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - - (actor, metricsListener) - } - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala deleted file mode 100644 index ebc43091..00000000 --- a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.LongBuffer - -import akka.actor._ -import akka.routing.RoundRobinPool -import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.RouterMetrics._ -import kamon.metric.RouterMetricsTestActor._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.instrument.{ Counter, Histogram } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 second - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) - - "the Kamon router metrics" should { - "respect the configured include and exclude filters" in new RouterMetricsFixtures { - createTestRouter("tracked-router") - createTestRouter("non-tracked-router") - createTestRouter("tracked-explicitly-excluded") - - Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) - } - } - - "record the processing-time of the receive function" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-processing-time") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) - } - - "record the number of errors" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-errors") - - for (i ← 1 to 10) { - trackedRouter.tell(Fail, metricsListener.ref) - } - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) - } - - "record the time-in-mailbox" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-time-in-mailbox") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) - } - - "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { - val trackedRouter = createTestRouter("stop") - trackedRouter ! Ping - Kamon(Metrics).storage.toString() // force to be initialized - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty - - val deathWatcher = TestProbe() - deathWatcher.watch(trackedRouter) - trackedRouter ! PoisonPill - deathWatcher.expectTerminated(trackedRouter) - - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty - } - } - - trait RouterMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) - } -} - -class RouterMetricsTestActor extends Actor { - def receive = { - case Discard ⇒ - case Fail ⇒ throw new ArithmeticException("Division by zero.") - case Ping ⇒ sender ! Pong - case RouterTrackTimings(sendTimestamp, sleep) ⇒ { - val dequeueTimestamp = System.nanoTime() - sleep.map(s ⇒ Thread.sleep(s.toMillis)) - val afterReceiveTimestamp = System.nanoTime() - - sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) - } - } -} - -object RouterMetricsTestActor { - case object Ping - case object Pong - case object Fail - case object Discard - - case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) - case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { - def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp - def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp - } -} -- cgit v1.2.3