diff options
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka')
4 files changed, 309 insertions, 0 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala new file mode 100644 index 00000000..b22f7fa9 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram, MinMaxCounter } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = ActorMetricGroupFactory +} + +case object ActorMetricGroupFactory extends MetricGroupFactory { + import kamon.akka.ActorMetrics._ + + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } +} diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala new file mode 100644 index 00000000..bc013b63 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -0,0 +1,32 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor +import akka.actor._ +import kamon._ + +class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val config = system.settings.config.getConfig("kamon.akka") + val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") + val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) +} + +object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Akka + def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) +}
\ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala new file mode 100644 index 00000000..64e16f96 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/DispatcherMetrics.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.Histogram + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = DispatcherMetricGroupFactory +} + +case object DispatcherMetricGroupFactory extends MetricGroupFactory { + + import kamon.akka.DispatcherMetrics._ + + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + +} diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala new file mode 100644 index 00000000..2eedf764 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.akka + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric._ +import kamon.metric.instrument.{ Counter, Histogram } + +case class RouterMetrics(name: String) extends MetricGroupIdentity { + val category = RouterMetrics +} + +object RouterMetrics extends MetricGroupCategory { + val name = "router" + + case object RoutingTime extends MetricIdentity { val name = "routing-time" } + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class RouterMetricsRecorder(routingTime: Histogram, processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot(routingTime.collect(context), processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) + + def cleanup: Unit = { + routingTime.cleanup + processingTime.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class RouterMetricSnapshot(routingTime: Histogram.Snapshot, processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = RouterMetricSnapshot + + def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot( + routingTime.merge(that.routingTime, context), + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + RoutingTime -> routingTime, + ProcessingTime -> processingTime, + TimeInMailbox -> timeInMailbox, + Errors -> errors) + } + + val Factory = RouterMetricGroupFactory +} + +case object RouterMetricGroupFactory extends MetricGroupFactory { + + import kamon.akka.RouterMetrics._ + + type GroupRecorder = RouterMetricsRecorder + + def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { + val settings = config.getConfig("precision.router") + + val routingTimeConfig = settings.getConfig("routing-time") + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + + new RouterMetricsRecorder( + Histogram.fromConfig(routingTimeConfig), + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + Counter()) + } +} + |