/* ========================================================================================= * Copyright © 2013-2017 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import kamon.metric._ import kamon.trace.Span import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.util.Try import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap trait ReporterRegistry { def loadReportersFromConfig(): Unit def addReporter(reporter: MetricReporter): Registration def addReporter(reporter: MetricReporter, name: String): Registration def addReporter(reporter: SpanReporter): Registration def addReporter(reporter: SpanReporter, name: String): Registration def stopAllReporters(): Future[Unit] } trait Registration { def cancel(): Boolean } trait MetricReporter { def start(): Unit def stop(): Unit def reconfigure(config: Config): Unit def reportTickSnapshot(snapshot: TickSnapshot): Unit } trait SpanReporter { def start(): Unit def stop(): Unit def reconfigure(config: Config): Unit def reportSpans(spans: Seq[Span.CompletedSpan]): Unit } class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) private val metricReporters = TrieMap[Long, MetricReporterEntry]() private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() private val spanReporters = TrieMap[Long, SpanReporterEntry]() private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() reconfigure(initialConfig) override def loadReportersFromConfig(): Unit = ??? override def addReporter(reporter: MetricReporter): Registration = addMetricReporter(reporter, reporter.getClass.getName()) override def addReporter(reporter: MetricReporter, name: String): Registration = addMetricReporter(reporter, name) override def addReporter(reporter: SpanReporter): Registration = addSpanReporter(reporter, reporter.getClass.getName()) override def addReporter(reporter: SpanReporter, name: String): Registration = addSpanReporter(reporter, name) private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new MetricReporterEntry( id = reporterCounter.getAndIncrement(), reporter = reporter, executionContext = ExecutionContext.fromExecutorService(executor) ) metricReporters.put(reporterEntry.id, reporterEntry) createRegistration(reporterEntry.id, metricReporters) } private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new SpanReporterEntry( id = reporterCounter.incrementAndGet(), reporter = reporter, bufferCapacity = 1024, executionContext = ExecutionContext.fromExecutorService(executor) ) spanReporters.put(reporterEntry.id, reporterEntry) createRegistration(reporterEntry.id, spanReporters) } private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { override def cancel(): Boolean = target.remove(id).nonEmpty } override def stopAllReporters(): Future[Unit] = { implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) val reporterStopFutures = Vector.newBuilder[Future[Unit]] while(metricReporters.nonEmpty) { val (idToRemove, _) = metricReporters.head metricReporters.remove(idToRemove).foreach { entry => reporterStopFutures += stopMetricReporter(entry) } } while(spanReporters.nonEmpty) { val (idToRemove, _) = spanReporters.head spanReporters.remove(idToRemove).foreach { entry => reporterStopFutures += stopSpanReporter(entry) } } Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) } private[kamon] def reconfigure(config: Config): Unit = synchronized { val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS) val traceTickIntervalMillis = config.getDuration("kamon.trace.tick-interval", TimeUnit.MILLISECONDS) val currentMetricTicker = metricReporterTickerSchedule.get() if(currentMetricTicker != null) { currentMetricTicker.cancel(true) } val currentSpanTicker = spanReporterTickerSchedule.get() if(currentSpanTicker != null) { currentSpanTicker.cancel(true) } // Reconfigure all registered reporters metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } metricReporterTickerSchedule.set { registryExecutionContext.scheduleAtFixedRate( new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS ) } spanReporterTickerSchedule.set { registryExecutionContext.scheduleAtFixedRate( new SpanReporterTicker(spanReporters), traceTickIntervalMillis, traceTickIntervalMillis, TimeUnit.MILLISECONDS ) } } private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) reporterEntry.buffer.offer(span) } } private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { entry.isActive = false Future(entry.reporter.stop())(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { entry.isActive = false Future(entry.reporter.stop())(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } private class MetricReporterEntry( @volatile var isActive: Boolean = true, val id: Long, val reporter: MetricReporter, val executionContext: ExecutionContextExecutorService ) private class SpanReporterEntry( @volatile var isActive: Boolean = true, val id: Long, val reporter: SpanReporter, val bufferCapacity: Int, val executionContext: ExecutionContextExecutorService ) { val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity) } private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { val logger = Logger(classOf[MetricReporterTicker]) var lastTick = System.currentTimeMillis() def run(): Unit = try { val currentTick = System.currentTimeMillis() val tickSnapshot = TickSnapshot( interval = Interval(lastTick, currentTick), metrics = snapshotGenerator.snapshot() ) reporterEntries.foreach { case (_, entry) => Future { if(entry.isActive) entry.reporter.reportTickSnapshot(tickSnapshot) }(executor = entry.executionContext) } lastTick = currentTick } catch { case NonFatal(t) => logger.error("Error while running a tick", t) } } private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { override def run(): Unit = { spanReporters.foreach { case (_, entry) => val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity) entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { entry.reporter.reportSpans(spanBatch.asScala) }(entry.executionContext) } } } }