aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
blob: 9a08da71a61eb75aca7c6349729f7a8a0437f4e4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
 * =========================================================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.metrics

import scala.collection.concurrent.TrieMap
import akka.actor._
import com.typesafe.config.Config
import kamon.util.{ Contexts, GlobPathFilter }
import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.Subscriptions.Subscribe

class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
  val config = system.settings.config
  val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
  val filters = loadFilters(config)
  lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")

  def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
    if (shouldTrack(identity))
      Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder])
    else
      None
  }

  def unregister(identity: MetricGroupIdentity): Unit = {
    storage.remove(identity)
  }

  def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
    subscriptions.tell(Subscribe(category, selection, permanently), receiver)
  }

  def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
    (for ((identity, recorder)  storage) yield (identity, recorder.collect)).toMap
  }

  private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
    filters.get(identity.category.name).map(filter  filter.accept(identity.name)).getOrElse(false)
  }

  def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
    import scala.collection.JavaConverters._

    val filters = config.getObjectList("kamon.metrics.filters").asScala

    val allFilters =
      for (
        filter  filters;
        entry  filter.entrySet().asScala
      ) yield {
        val key = entry.getKey
        val keyBasedConfig = entry.getValue.atKey(key)

        val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc  new GlobPathFilter(inc)).toList
        val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc  new GlobPathFilter(exc)).toList

        (key, MetricGroupFilter(includes, excludes))
      }

    allFilters.toMap
  }

  val defaultDispatcher = Contexts.lookupExecutionContext(Contexts.kamonDefaultDispatcher)(system)
}

object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
  def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
  def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system)

  case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
    def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
  }
}