aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
blob: 06b9761a040ca443152a8a759a572ccfd718c4cf (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15














                                                                                             

             
                                    




                                                                
                       
                                    
                                                      
                              



                                                                                   
                                        
                                          
                                  
 






                                       
                                                          





                                                      
                        
                                     
 

                                                                       
                                                                                       

                                                                     
 
                                      

 
                         
 


                                                    
 
                                                                                                                                               
                                                                           
                                                                                                                                       




























                                                                                            


         
 

                                                                      
 

                                                                                    
 


                                                                                                    

                                                                    
 

                                                                                  
 
 
                                                                                                                                       




                                                                           
                        

                                                                         
 






                                                                          
 

                                 
 

                                                           
 
     
 








                                                                                                    
 






                                                                        
 

                               
 


                                                         
 



                                                                                                         
 


                                                                                                   
 




                                                             
       
 




                                                           
       
 
                                                                
     
 
                                                                         

                                                               
 
                                                                                                              
                             
 
                                                                                                          
                            
 
                                             



















                                                                                            
     
 
 


                                                                                
 

                                         
 
                                        

                                                                   


                                                                                        


                                   
                                                     
                                                                                                                            

         
     
 




                                                                               
 




                                                                                                              
     
 




                                                        
     
 

                                                                                
 






                                                                                 


                                                                
 

                                                                            
 






                                                                               


                                                                
 




                                             
                                 

                                                           
 









                                                                            
 


                                                                                                                                        
                                                                         
                                          
 
                             
                                               
                                            

                              

                                                
 


                                                    






                                                                     
 


                                                                                                
 

                                   
 
                                    



                                                                         











                                                                                                           
     
 
 



                                                                                                        
 

                                                                                            
 
                    




                                                                                        

                                     

       
 


                                                                              
                                                                                                    




                                                                                  

                                                                                                          

   
 
/* =========================================================================================
 * Copyright © 2013-2017 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._

import com.typesafe.config.Config
import kamon.metric._
import kamon.trace.Span
import kamon.trace.Span.FinishedSpan
import kamon.util.{Clock, DynamicAccess, Registration}
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration._

sealed trait Reporter {
  def start(): Unit
  def stop(): Unit
  def reconfigure(config: Config): Unit
}

trait MetricReporter extends Reporter {
  def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
}

trait SpanReporter extends Reporter {
  def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
}

trait ReporterRegistry {
  def loadReportersFromConfig(): Unit

  def addReporter(reporter: MetricReporter): Registration
  def addReporter(reporter: MetricReporter, name: String): Registration
  def addReporter(reporter: MetricReporter, name: String, filter: String): Registration
  def addReporter(reporter: SpanReporter): Registration
  def addReporter(reporter: SpanReporter, name: String): Registration

  def stopAllReporters(): Future[Unit]
}

object ReporterRegistry {

  private[kamon] trait SpanSink {
    def reportSpan(finishedSpan: FinishedSpan): Unit
  }

  private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
    private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
    private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true))
    private val reporterCounter = new AtomicLong(0L)
    private var registryConfiguration = readRegistryConfiguration(initialConfig)

    private val metricReporters = TrieMap[Long, MetricReporterEntry]()
    private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
    private val spanReporters = TrieMap[Long, SpanReporterEntry]()
    private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()


    reconfigure(initialConfig)

    override def loadReportersFromConfig(): Unit = {
      if(registryConfiguration.configuredReporters.isEmpty)
        logger.info("The kamon.reporters setting is empty, no reporters have been started.")
      else {
        registryConfiguration.configuredReporters.foreach { reporterFQCN =>
          val dynamicAccess = new DynamicAccess(getClass.getClassLoader)
          dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({
            case mr: MetricReporter =>
              addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
              logger.info("Loaded metric reporter [{}]", reporterFQCN)

            case sr: SpanReporter =>
              addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
              logger.info("Loaded span reporter [{}]", reporterFQCN)

          }).failed.foreach {
            t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
          }
        }
      }
    }

    override def addReporter(reporter: MetricReporter): Registration =
      addMetricReporter(reporter, reporter.getClass.getName())

    override def addReporter(reporter: MetricReporter, name: String): Registration =
      addMetricReporter(reporter, name)

    override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
      addMetricReporter(reporter, name, Some(filter))

    override def addReporter(reporter: SpanReporter): Registration =
      addSpanReporter(reporter, reporter.getClass.getName())

    override def addReporter(reporter: SpanReporter, name: String): Registration =
      addSpanReporter(reporter, name)


    private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized {
      val executor = Executors.newSingleThreadExecutor(threadFactory(name))
      val reporterEntry = new MetricReporterEntry(
        id = reporterCounter.getAndIncrement(),
        name = name,
        reporter = reporter,
        filter = filter,
        executionContext = ExecutionContext.fromExecutorService(executor)
      )

      Future {
        Try {
          reporterEntry.reporter.start()
        }.failed.foreach { error =>
          logger.error(s"Metric reporter [$name] failed to start.", error)
        }
      }(reporterEntry.executionContext)

      if(metricReporters.isEmpty)
        reStartMetricTicker()

      metricReporters.put(reporterEntry.id, reporterEntry)
      createRegistration(reporterEntry.id, metricReporters)

    }

    private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
      val executor = Executors.newSingleThreadExecutor(threadFactory(name))
      val reporterEntry = new SpanReporterEntry(
        id = reporterCounter.incrementAndGet(),
        name = name,
        reporter = reporter,
        bufferCapacity = registryConfiguration.traceReporterQueueSize,
        executionContext = ExecutionContext.fromExecutorService(executor)
      )

      Future {
        Try {
          reporterEntry.reporter.start()
        }.failed.foreach { error =>
          logger.error(s"Span reporter [$name] failed to start.", error)
        }
      }(reporterEntry.executionContext)

      if(spanReporters.isEmpty)
        reStartTraceTicker()

      spanReporters.put(reporterEntry.id, reporterEntry)
      createRegistration(reporterEntry.id, spanReporters)
    }

    private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
      override def cancel(): Boolean =
        target.remove(id).nonEmpty
    }

    override def stopAllReporters(): Future[Unit] = {
      implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
      val reporterStopFutures = Vector.newBuilder[Future[Unit]]

      while(metricReporters.nonEmpty) {
        val (idToRemove, _) = metricReporters.head
        metricReporters.remove(idToRemove).foreach { entry =>
          reporterStopFutures += stopMetricReporter(entry)
        }
      }

      while(spanReporters.nonEmpty) {
        val (idToRemove, _) = spanReporters.head
        spanReporters.remove(idToRemove).foreach { entry =>
          reporterStopFutures += stopSpanReporter(entry)
        }
      }

      Future.sequence(reporterStopFutures.result()).map(_ => ())
    }

    private[kamon] def reconfigure(config: Config): Unit = synchronized {
      val oldConfig = registryConfiguration
      registryConfiguration = readRegistryConfiguration(config)

      if(oldConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty)
        reStartMetricTicker()

      if(oldConfig.traceTickInterval != registryConfiguration.traceTickInterval && spanReporters.nonEmpty)
        reStartTraceTicker()

      // Reconfigure all registered reporters
      metricReporters.foreach {
        case (_, entry) =>
          Future {
            Try {
              entry.reporter.reconfigure(config)
            }.failed.foreach { error =>
              logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error)
            }
          }(entry.executionContext)
      }
      spanReporters.foreach   {
        case (_, entry) =>
          Future {
            Try {
              entry.reporter.reconfigure(config)
            }.failed.foreach { error =>
              logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error)
            }
          }(entry.executionContext)
      }
    }


    private def reStartMetricTicker(): Unit = {
      val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis
      val currentMetricTicker = metricReporterTickerSchedule.get()

      if(currentMetricTicker != null)
        currentMetricTicker.cancel(false)

      metricReporterTickerSchedule.set {
        val initialDelay =
          if(registryConfiguration.optimisticMetricTickAlignment) {
            val now = clock.instant()
            val nextTick = Clock.nextTick(now, registryConfiguration.metricTickInterval)
            Duration.between(now, nextTick).toMillis

          } else tickIntervalMillis

        registryExecutionContext.scheduleAtFixedRate(
          new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
        )
      }
    }

    private def reStartTraceTicker(): Unit = {
      val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis
      val currentSpanTicker = spanReporterTickerSchedule.get()
      if(currentSpanTicker  != null)
        currentSpanTicker.cancel(false)

      spanReporterTickerSchedule.set {
        registryExecutionContext.scheduleAtFixedRate(
          new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
        )
      }
    }

    def reportSpan(span: Span.FinishedSpan): Unit = {
      spanReporters.foreach { case (_, reporterEntry) =>
        if(reporterEntry.isActive)
          reporterEntry.buffer.offer(span)
      }
    }

    private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
      entry.isActive = false

      Future {
        Try {
          entry.reporter.stop()
        }.failed.foreach { error =>
          logger.error(s"Metric reporter [${entry.name}] failed to stop.", error)
        }
      }(entry.executionContext).andThen {
        case _ => entry.executionContext.shutdown()
      }(ExecutionContext.fromExecutor(registryExecutionContext))
    }

    private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
      entry.isActive = false

      Future {
        Try {
          entry.reporter.stop()
        }.failed.foreach { error =>
          logger.error(s"Span reporter [${entry.name}] failed to stop.", error)
        }
      }(entry.executionContext).andThen {
        case _ => entry.executionContext.shutdown()
      }(ExecutionContext.fromExecutor(registryExecutionContext))
    }

    private class MetricReporterEntry(
      @volatile var isActive: Boolean = true,
      val id: Long,
      val name: String,
      val reporter: MetricReporter,
      val filter: Option[String],
      val executionContext: ExecutionContextExecutorService
    )

    private class SpanReporterEntry(
      @volatile var isActive: Boolean = true,
      val id: Long,
      val name: String,
      val reporter: SpanReporter,
      val bufferCapacity: Int,
      val executionContext: ExecutionContextExecutorService
    ) {
      val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
    }

    private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry],
        clock: Clock) extends Runnable {

      val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
      var lastInstant = Instant.now(clock)

      def run(): Unit = try {
        val currentInstant = Instant.now(clock)
        val periodSnapshot = PeriodSnapshot(
          from = lastInstant,
          to = currentInstant,
          metrics = snapshotGenerator.snapshot()
        )

        reporterEntries.foreach { case (_, entry) =>
          Future {
            Try {
              if (entry.isActive) {
                val filteredSnapshot = entry.filter
                  .map(f => filterMetrics(f, periodSnapshot))
                  .getOrElse(periodSnapshot)

                entry.reporter.reportPeriodSnapshot(filteredSnapshot)
              }

            }.failed.foreach { error =>
              logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
            }

          }(entry.executionContext)
        }

        lastInstant = currentInstant

      } catch {
        case NonFatal(t) => logger.error("Error while running a tick", t)
      }

      private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
        val metricFilter = Kamon.filter(filterName)
        val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
        val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
        val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
        val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))

        periodSnapshot.copy(metrics = MetricsSnapshot(
          histograms, rangeSamplers, gauges, counters
        ))
      }
    }


    private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
      override def run(): Unit = {
        spanReporters.foreach {
          case (_, entry) =>

            val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
            entry.buffer.drainTo(spanBatch, entry.bufferCapacity)

            Future {
              Try {
                entry.reporter.reportSpans(spanBatch.asScala)
              }.failed.foreach { error =>
                logger.error(s"Reporter [${entry.name}] failed to report spans.", error)
              }
            }(entry.executionContext)
        }
      }
    }

    private def readRegistryConfiguration(config: Config): Configuration =
      Configuration(
        metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
        optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
        traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
        traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
        configuredReporters = config.getStringList("kamon.reporters").asScala
      )

    private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean,
      traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String])
  }
}