/* =========================================================================================
* Copyright © 2013-2017 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import kamon.metric._
import kamon.trace.Span
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
trait ReporterRegistry {
def loadReportersFromConfig(): Unit
def addReporter(reporter: MetricReporter): Registration
def addReporter(reporter: MetricReporter, name: String): Registration
def addReporter(reporter: SpanReporter): Registration
def addReporter(reporter: SpanReporter, name: String): Registration
def stopAllReporters(): Future[Unit]
}
trait Registration {
def cancel(): Boolean
}
trait MetricReporter {
def start(): Unit
def stop(): Unit
def reconfigure(config: Config): Unit
def reportTickSnapshot(snapshot: TickSnapshot): Unit
}
trait SpanReporter {
def start(): Unit
def stop(): Unit
def reconfigure(config: Config): Unit
def reportSpans(spans: Seq[Span.CompletedSpan]): Unit
}
class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val reporterCounter = new AtomicLong(0L)
private val metricReporters = TrieMap[Long, MetricReporterEntry]()
private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
private val spanReporters = TrieMap[Long, SpanReporterEntry]()
private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
reconfigure(initialConfig)
override def loadReportersFromConfig(): Unit = ???
override def addReporter(reporter: MetricReporter): Registration =
addMetricReporter(reporter, reporter.getClass.getName())
override def addReporter(reporter: MetricReporter, name: String): Registration =
addMetricReporter(reporter, name)
override def addReporter(reporter: SpanReporter): Registration =
addSpanReporter(reporter, reporter.getClass.getName())
override def addReporter(reporter: SpanReporter, name: String): Registration =
addSpanReporter(reporter, name)
private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
val reporterEntry = new MetricReporterEntry(
id = reporterCounter.getAndIncrement(),
reporter = reporter,
executionContext = ExecutionContext.fromExecutorService(executor)
)
metricReporters.put(reporterEntry.id, reporterEntry)
createRegistration(reporterEntry.id, metricReporters)
}
private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
val reporterEntry = new SpanReporterEntry(
id = reporterCounter.incrementAndGet(),
reporter = reporter,
bufferCapacity = 1024,
executionContext = ExecutionContext.fromExecutorService(executor)
)
spanReporters.put(reporterEntry.id, reporterEntry)
createRegistration(reporterEntry.id, spanReporters)
}
private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
override def cancel(): Boolean =
target.remove(id).nonEmpty
}
override def stopAllReporters(): Future[Unit] = {
implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
val reporterStopFutures = Vector.newBuilder[Future[Unit]]
while(metricReporters.nonEmpty) {
val (idToRemove, _) = metricReporters.head
metricReporters.remove(idToRemove).foreach { entry =>
reporterStopFutures += stopMetricReporter(entry)
}
}
while(spanReporters.nonEmpty) {
val (idToRemove, _) = spanReporters.head
spanReporters.remove(idToRemove).foreach { entry =>
reporterStopFutures += stopSpanReporter(entry)
}
}
Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
}
private[kamon] def reconfigure(config: Config): Unit = synchronized {
val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS)
val traceTickIntervalMillis = config.getDuration("kamon.trace.tick-interval", TimeUnit.MILLISECONDS)
val currentMetricTicker = metricReporterTickerSchedule.get()
if(currentMetricTicker != null) {
currentMetricTicker.cancel(true)
}
val currentSpanTicker = spanReporterTickerSchedule.get()
if(currentSpanTicker != null) {
currentSpanTicker.cancel(true)
}
// Reconfigure all registered reporters
metricReporters.foreach { case (_, entry) =>
Future(entry.reporter.reconfigure(config))(entry.executionContext)
}
spanReporters.foreach { case (_, entry) =>
Future(entry.reporter.reconfigure(config))(entry.executionContext)
}
metricReporterTickerSchedule.set {
registryExecutionContext.scheduleAtFixedRate(
new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
spanReporterTickerSchedule.set {
registryExecutionContext.scheduleAtFixedRate(
new SpanReporterTicker(spanReporters), traceTickIntervalMillis, traceTickIntervalMillis, TimeUnit.MILLISECONDS
)
}
}
private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
spanReporters.foreach { case (_, reporterEntry) =>
if(reporterEntry.isActive)
reporterEntry.buffer.offer(span)
}
}
private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
entry.isActive = false
Future(entry.reporter.stop())(entry.executionContext).andThen {
case _ => entry.executionContext.shutdown()
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
entry.isActive = false
Future(entry.reporter.stop())(entry.executionContext).andThen {
case _ => entry.executionContext.shutdown()
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
private class MetricReporterEntry(
@volatile var isActive: Boolean = true,
val id: Long,
val reporter: MetricReporter,
val executionContext: ExecutionContextExecutorService
)
private class SpanReporterEntry(
@volatile var isActive: Boolean = true,
val id: Long,
val reporter: SpanReporter,
val bufferCapacity: Int,
val executionContext: ExecutionContextExecutorService
) {
val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity)
}
private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
val logger = Logger(classOf[MetricReporterTicker])
var lastTick = System.currentTimeMillis()
def run(): Unit = try {
val currentTick = System.currentTimeMillis()
val tickSnapshot = TickSnapshot(
interval = Interval(lastTick, currentTick),
metrics = snapshotGenerator.snapshot()
)
reporterEntries.foreach { case (_, entry) =>
Future {
if(entry.isActive)
entry.reporter.reportTickSnapshot(tickSnapshot)
}(executor = entry.executionContext)
}
lastTick = currentTick
} catch {
case NonFatal(t) => logger.error("Error while running a tick", t)
}
}
private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
override def run(): Unit = {
spanReporters.foreach {
case (_, entry) =>
val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity)
entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
Future {
entry.reporter.reportSpans(spanBatch.asScala)
}(entry.executionContext)
}
}
}
}