aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
blob: a22162eb2f1a3327735e14c98fa5fe32909bdebd (plain) (tree)
1
2
3
4
5
6
7
8
9

             



                                                                
                                       
                     
                       



                                                                                   
                                        


                            
 

                                                                

                                                


                             





                       
                   

                  

                                                      


                     
                   

                  
                                       


                                                
                                                                                                               


                                                                                                                             
                                                                        










                                                                             
                                          









                                                                       



                                          



       







                                                             









                                                                                                 
                                                                         


                                                                       
                                                                                                    





                                                   
                                                       




                                                                        
                                                                                                                 



       

                                                                   
                                                                

   







                                                                   
                              
                                           


                                                         

   

                                                                                                                                             
                                             

                           
                                                  

                                                   
                                              

       
                                                           

                            
                                                           









                                                                       
 
package kamon

import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._

import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import kamon.metric._
import kamon.trace.Span

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._

trait ReporterRegistry {
  def loadFromConfig(): Unit

  def add(reporter: MetricsReporter): Registration
  def add(reporter: MetricsReporter, name: String): Registration
  def add(reporter: SpansReporter): Registration

  def stopAll(): Future[Unit]
}


trait Registration {
  def cancel(): Boolean
}

trait MetricsReporter {
  def start(): Unit
  def stop(): Unit

  def reconfigure(config: Config): Unit
  def reportTickSnapshot(snapshot: TickSnapshot): Unit
}

trait SpansReporter {
  def start(): Unit
  def stop(): Unit

  def reconfigure(config: Config): Unit
  def reportSpan(span: Span.CompletedSpan): Unit
}

class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
  private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry"))
  private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
  private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
  private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]()
  private val reporterCounter = new AtomicLong(0L)

  reconfigure(initialConfig)

  override def loadFromConfig(): Unit = ???

  override def add(reporter: MetricsReporter): Registration =
    add(reporter, reporter.getClass.getName())

  override def add(reporter: MetricsReporter, name: String): Registration = {
    val executor = Executors.newSingleThreadExecutor(threadFactory(name))
    val reporterEntry = new ReporterEntry(
      id = reporterCounter.getAndIncrement(),
      reporter = reporter,
      executionContext = ExecutionContext.fromExecutorService(executor)
    )

    metricReporters.add(reporterEntry)

    new Registration {
      val reporterID = reporterEntry.id
      override def cancel(): Boolean = {
        metricReporters.iterator().asScala
          .find(e => e.id == reporterID)
          .map(e => stopReporter(e))
          .isDefined
      }
    }
  }

  override def add(reporter: SpansReporter): Registration = {
    spanReporters.add(reporter)

    new Registration {
      override def cancel(): Boolean = true
    }
  }

  override def stopAll(): Future[Unit] = {
    implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
    val reporterStopFutures = Vector.newBuilder[Future[Unit]]
    while(!metricReporters.isEmpty) {
      val entry = metricReporters.poll()
      if(entry != null) {
        reporterStopFutures += stopReporter(entry)
      }
    }

    Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
  }

  private[kamon] def reconfigure(config: Config): Unit = synchronized {
    val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS)
    val currentTicker = metricsTickerSchedule.get()
    if(currentTicker != null) {
      currentTicker.cancel(true)
    }

    // Reconfigure all registered reporters
    metricReporters.iterator().asScala.foreach(entry =>
      Future(entry.reporter.reconfigure(config))(entry.executionContext)
    )

    metricsTickerSchedule.set {
      registryExecutionContext.scheduleAtFixedRate(
        new MetricTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
      )
    }
  }


  private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
    spanReporters.iterator().asScala.foreach(_.reportSpan(span))
  }

  private def stopReporter(entry: ReporterEntry): Future[Unit] = {
    entry.isActive = false

    Future(entry.reporter.stop())(entry.executionContext).andThen {
      case _ => entry.executionContext.shutdown()
    }(ExecutionContext.fromExecutor(registryExecutionContext))
  }

  private class ReporterEntry(
    @volatile var isActive: Boolean = true,
    val id: Long,
    val reporter: MetricsReporter,
    val executionContext: ExecutionContextExecutorService
  )

  private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
    val logger = Logger(classOf[MetricTicker])
    var lastTick = System.currentTimeMillis()

    def run(): Unit = try {
      val currentTick = System.currentTimeMillis()
      val tickSnapshot = TickSnapshot(
        interval = Interval(lastTick, currentTick),
        metrics = snapshotGenerator.snapshot()
      )

      reporterEntries.iterator().asScala.foreach { entry =>
        Future {
          if(entry.isActive)
            entry.reporter.reportTickSnapshot(tickSnapshot)

        }(executor = entry.executionContext)
      }

      lastTick = currentTick

    } catch {
      case NonFatal(t) => logger.error("Error while running a tick", t)
    }
  }
}