diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 09:23:07 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 10:37:08 +0200 |
commit | a152a3098b564ed43766a857b32b7c7d7445f9ce (patch) | |
tree | 7651f61e598f316ee9dca415c5a5c67ce530bad5 /kamon-core/src/main/scala | |
parent | 3cb974e5dfd381b9b28ffef9977047cf35242121 (diff) | |
download | Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2 Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip |
binary encoding of context and entries
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/Kamon.scala | 24 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 425 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Codec.scala | 144 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Codecs.scala | 245 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanCodec.scala | 71 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 15 |
7 files changed, 549 insertions, 379 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 2c0561e2..f251b1ec 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} -import kamon.context.{Codec, Context, Storage} +import kamon.context.{Codecs, Context, Storage} import org.slf4j.LoggerFactory import scala.util.Try @@ -39,10 +39,10 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) - private val _reporters = new ReporterRegistryImpl(_metrics, _config) - private val _tracer = Tracer.Default(Kamon, _reporters, _config) + private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config) + private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config) private val _contextStorage = Storage.ThreadLocal() - private val _contextCodec = new Codec(_config) + private val _contextCodec = new Codecs(_config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] def environment: Environment = @@ -56,7 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { _environment = Environment.fromConfig(config) _filters = Filters.fromConfig(config) _metrics.reconfigure(config) - _reporters.reconfigure(config) + _reporterRegistry.reconfigure(config) _tracer.reconfigure(config) _contextCodec.reconfigure(config) @@ -100,7 +100,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def identityProvider: IdentityProvider = _tracer.identityProvider - def contextCodec(): Codec = + def contextCodec(): Codecs = _contextCodec def currentContext(): Context = @@ -120,22 +120,22 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def loadReportersFromConfig(): Unit = - _reporters.loadReportersFromConfig() + _reporterRegistry.loadReportersFromConfig() override def addReporter(reporter: MetricReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: MetricReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def addReporter(reporter: SpanReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: SpanReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def stopAllReporters(): Future[Unit] = - _reporters.stopAllReporters() + _reporterRegistry.stopAllReporters() def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index f0d744e5..ff135f60 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -33,6 +33,20 @@ import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap +sealed trait Reporter { + def start(): Unit + def stop(): Unit + def reconfigure(config: Config): Unit +} + +trait MetricReporter extends Reporter { + def reportTickSnapshot(snapshot: TickSnapshot): Unit +} + +trait SpanReporter extends Reporter { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} + trait ReporterRegistry { def loadReportersFromConfig(): Unit @@ -48,274 +62,261 @@ object ReporterRegistry { private[kamon] trait SpanSink { def reportSpan(finishedSpan: FinishedSpan): Unit } -} - -sealed trait Reporter { - def start(): Unit - def stop(): Unit - def reconfigure(config: Config): Unit -} -trait MetricReporter extends Reporter { - def reportTickSnapshot(snapshot: TickSnapshot): Unit -} - -trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.FinishedSpan]): Unit -} - -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { - private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) - private val reporterCounter = new AtomicLong(0L) - private var registryConfiguration = readRegistryConfiguration(initialConfig) - - private val metricReporters = TrieMap[Long, MetricReporterEntry]() - private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporters = TrieMap[Long, SpanReporterEntry]() - private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - - reconfigure(initialConfig) - - override def loadReportersFromConfig(): Unit = { - if(registryConfiguration.configuredReporters.isEmpty) - logger.info("The kamon.reporters setting is empty, no reporters have been started.") - else { - registryConfiguration.configuredReporters.foreach { reporterFQCN => - val dynamicAccess = new DynamicAccess(getClass.getClassLoader) - dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ - case mr: MetricReporter => - addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded metric reporter [{}]", reporterFQCN) - - case sr: SpanReporter => - addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded span reporter [{}]", reporterFQCN) - - }).failed.foreach { - t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { + private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) + private val reporterCounter = new AtomicLong(0L) + private var registryConfiguration = readRegistryConfiguration(initialConfig) + + private val metricReporters = TrieMap[Long, MetricReporterEntry]() + private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val spanReporters = TrieMap[Long, SpanReporterEntry]() + private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + + reconfigure(initialConfig) + + override def loadReportersFromConfig(): Unit = { + if(registryConfiguration.configuredReporters.isEmpty) + logger.info("The kamon.reporters setting is empty, no reporters have been started.") + else { + registryConfiguration.configuredReporters.foreach { reporterFQCN => + val dynamicAccess = new DynamicAccess(getClass.getClassLoader) + dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ + case mr: MetricReporter => + addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded metric reporter [{}]", reporterFQCN) + + case sr: SpanReporter => + addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded span reporter [{}]", reporterFQCN) + + }).failed.foreach { + t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + } } } } - } - override def addReporter(reporter: MetricReporter): Registration = - addMetricReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: MetricReporter): Registration = + addMetricReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: MetricReporter, name: String): Registration = - addMetricReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String): Registration = + addMetricReporter(reporter, name) - override def addReporter(reporter: SpanReporter): Registration = - addSpanReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: SpanReporter): Registration = + addSpanReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: SpanReporter, name: String): Registration = - addSpanReporter(reporter, name) + override def addReporter(reporter: SpanReporter, name: String): Registration = + addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new MetricReporterEntry( - id = reporterCounter.getAndIncrement(), - name = name, - reporter = reporter, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new MetricReporterEntry( + id = reporterCounter.getAndIncrement(), + name = name, + reporter = reporter, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(metricReporters.isEmpty) - reStartMetricTicker() + if(metricReporters.isEmpty) + reStartMetricTicker() - metricReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, metricReporters) + metricReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, metricReporters) - } + } - private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new SpanReporterEntry( - id = reporterCounter.incrementAndGet(), - name = name, - reporter = reporter, - bufferCapacity = registryConfiguration.traceReporterQueueSize, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new SpanReporterEntry( + id = reporterCounter.incrementAndGet(), + name = name, + reporter = reporter, + bufferCapacity = registryConfiguration.traceReporterQueueSize, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(spanReporters.isEmpty) - reStartTraceTicker() + if(spanReporters.isEmpty) + reStartTraceTicker() - spanReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, spanReporters) - } + spanReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, spanReporters) + } - private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { - override def cancel(): Boolean = - target.remove(id).nonEmpty - } + private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { + override def cancel(): Boolean = + target.remove(id).nonEmpty + } - override def stopAllReporters(): Future[Unit] = { - implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) - val reporterStopFutures = Vector.newBuilder[Future[Unit]] + override def stopAllReporters(): Future[Unit] = { + implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) + val reporterStopFutures = Vector.newBuilder[Future[Unit]] - while(metricReporters.nonEmpty) { - val (idToRemove, _) = metricReporters.head - metricReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopMetricReporter(entry) + while(metricReporters.nonEmpty) { + val (idToRemove, _) = metricReporters.head + metricReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopMetricReporter(entry) + } } - } - while(spanReporters.nonEmpty) { - val (idToRemove, _) = spanReporters.head - spanReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopSpanReporter(entry) + while(spanReporters.nonEmpty) { + val (idToRemove, _) = spanReporters.head + spanReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopSpanReporter(entry) + } } - } - Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) - } + Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) + } - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val newConfig = readRegistryConfiguration(config) + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val newConfig = readRegistryConfiguration(config) - if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) - reStartMetricTicker() + if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) + reStartMetricTicker() - if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) - reStartTraceTicker() + if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) + reStartTraceTicker() - // Reconfigure all registered reporters - metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - registryConfiguration = newConfig - } + // Reconfigure all registered reporters + metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + registryConfiguration = newConfig + } - private def reStartMetricTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis - val currentMetricTicker = metricReporterTickerSchedule.get() + private def reStartMetricTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis + val currentMetricTicker = metricReporterTickerSchedule.get() - if(currentMetricTicker != null) - currentMetricTicker.cancel(false) + if(currentMetricTicker != null) + currentMetricTicker.cancel(false) - metricReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + metricReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - private def reStartTraceTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis - val currentSpanTicker = spanReporterTickerSchedule.get() - if(currentSpanTicker != null) - currentSpanTicker.cancel(false) + private def reStartTraceTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis + val currentSpanTicker = spanReporterTickerSchedule.get() + if(currentSpanTicker != null) + currentSpanTicker.cancel(false) - spanReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + spanReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - def reportSpan(span: Span.FinishedSpan): Unit = { - spanReporters.foreach { case (_, reporterEntry) => - if(reporterEntry.isActive) - reporterEntry.buffer.offer(span) + def reportSpan(span: Span.FinishedSpan): Unit = { + spanReporters.foreach { case (_, reporterEntry) => + if(reporterEntry.isActive) + reporterEntry.buffer.offer(span) + } } - } - private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private class MetricReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: MetricReporter, - val executionContext: ExecutionContextExecutorService - ) - - private class SpanReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: SpanReporter, - val bufferCapacity: Int, - val executionContext: ExecutionContextExecutorService - ) { - val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) - } + private class MetricReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: MetricReporter, + val executionContext: ExecutionContextExecutorService + ) - private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { - val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) - var lastTick = System.currentTimeMillis() + private class SpanReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: SpanReporter, + val bufferCapacity: Int, + val executionContext: ExecutionContextExecutorService + ) { + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) + } - def run(): Unit = try { - val currentTick = System.currentTimeMillis() - val tickSnapshot = TickSnapshot( - interval = Interval(lastTick, currentTick), - metrics = snapshotGenerator.snapshot() - ) + private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) + var lastTick = System.currentTimeMillis() - reporterEntries.foreach { case (_, entry) => - Future { - Try { - if (entry.isActive) - entry.reporter.reportTickSnapshot(tickSnapshot) + def run(): Unit = try { + val currentTick = System.currentTimeMillis() + val tickSnapshot = TickSnapshot( + interval = Interval(lastTick, currentTick), + metrics = snapshotGenerator.snapshot() + ) - }.failed.foreach { error => - logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } + reporterEntries.foreach { case (_, entry) => + Future { + Try { + if (entry.isActive) + entry.reporter.reportTickSnapshot(tickSnapshot) - }(entry.executionContext) - } + }.failed.foreach { error => + logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } - lastTick = currentTick + }(entry.executionContext) + } - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) + lastTick = currentTick + + } catch { + case NonFatal(t) => logger.error("Error while running a tick", t) + } } - } - private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { - override def run(): Unit = { - spanReporters.foreach { - case (_, entry) => + private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { + override def run(): Unit = { + spanReporters.foreach { + case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) - entry.buffer.drainTo(spanBatch, entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) + entry.buffer.drainTo(spanBatch, entry.bufferCapacity) - Future { - entry.reporter.reportSpans(spanBatch.asScala) - }(entry.executionContext) + Future { + entry.reporter.reportSpans(spanBatch.asScala) + }(entry.executionContext) + } } } - } - private def readRegistryConfiguration(config: Config): Configuration = - Configuration( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), - configuredReporters = config.getStringList("kamon.reporters").asScala - ) + private def readRegistryConfiguration(config: Config): Configuration = + Configuration( + metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), + configuredReporters = config.getStringList("kamon.reporters").asScala + ) + + private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, + traceReporterQueueSize: Int, configuredReporters: Seq[String]) + } +} - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, - traceReporterQueueSize: Int, configuredReporters: Seq[String]) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala deleted file mode 100644 index 19ec2a20..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codec.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory - -import scala.collection.mutable - -class Codec(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codec]) - - @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) - //val Binary: Codec.ForContext[ByteBuffer] = _ - reconfigure(initialConfig) - - - def HttpHeaders: Codec.ForContext[TextMap] = - httpHeaders - - def reconfigure(config: Config): Unit = { - httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codec.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } -} - -object Codec { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala new file mode 100644 index 00000000..b50e991d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -0,0 +1,245 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.nio.ByteBuffer + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} + +import scala.collection.mutable + +class Codecs(initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codecs]) + @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) + @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) + + reconfigure(initialConfig) + + + def HttpHeaders: Codecs.ForContext[TextMap] = + httpHeaders + + def Binary: Codecs.ForContext[ByteBuffer] = + binary + + def reconfigure(config: Config): Unit = { + try { + val codecsConfig = config.getConfig("kamon.context.codecs") + httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig)) + binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig)) + } catch { + case t: Throwable => log.error("Failed to initialize Context Codecs", t) + } + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codecs.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codecs { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } + + + final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { + private val log = LoggerFactory.getLogger(classOf[Binary]) + private val binaryBuffer = newThreadLocalBuffer(bufferSize) + private val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val entries = context.entries + if(entries.isEmpty) + emptyBuffer + else { + var colferEntries: List[ColferEntry] = Nil + entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(entryCodec) => + try { + val entryData = entryCodec.encode(context) + if(entryData.capacity() > 0) { + val colferEntry = new ColferEntry() + colferEntry.setName(key.name) + colferEntry.setContent(entryData.array()) + colferEntries = colferEntry :: colferEntries + } + } catch { + case throwable: Throwable => + log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) + } + + case None => + log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + if(colferEntries.isEmpty) + emptyBuffer + else { + val buffer = binaryBuffer.get() + val colferContext = new ColferContext() + colferContext.setEntries(colferEntries.toArray) + val marshalledSize = colferContext.marshal(buffer, 0) + + val data = Array.ofDim[Byte](marshalledSize) + System.arraycopy(buffer, 0, data, 0, marshalledSize) + ByteBuffer.wrap(data) + } + } + } + + + override def decode(carrier: ByteBuffer): Context = { + if(carrier.capacity() == 0) + Context.Empty + else { + var context: Context = Context.Empty + + try { + val colferContext = new ColferContext() + colferContext.unmarshal(carrier.array(), 0) + + colferContext.entries.foreach(colferEntry => { + entryCodecs.get(colferEntry.getName()) match { + case Some(entryCodec) => + context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) + + case None => + log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) + } + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from Binary", e) + } + + context + } + } + + + private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) + } + } +} + + +trait TextMap { + + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = + mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = + storage.get(key) + + override def put(key: String, value: String): Unit = + storage.put(key, value) + + override def values: Iterator[(String, String)] = + storage.toIterator + } + + object Default { + def apply(): Default = + new Default() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index ea28142e..3158aa73 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -195,9 +195,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, scopeSpanMetrics: Boolean): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 96317696..ae78ee67 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -16,15 +16,17 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codec, Context, TextMap} +import kamon.context.{Codecs, Context, TextMap} +import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends Codec.ForEntry[TextMap] { + class B3 extends Codecs.ForEntry[TextMap] { import B3.Headers override def encode(context: Context): TextMap = { @@ -96,4 +98,69 @@ object SpanCodec { val Flags = "X-B3-Flags" } } + + + class Colfer extends Codecs.ForEntry[ByteBuffer] { + val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val span = context.get(Span.ContextKey) + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + val buffer = ByteBuffer.allocate(marshalledSize) + buffer.put(marshalBuffer, 0, marshalledSize) + buffer + + } else emptyBuffer + } + + override def decode(carrier: ByteBuffer, context: Context): Context = { + carrier.clear() + + if(carrier.capacity() == 0) + context + else { + val identityProvider = Kamon.tracer.identityProvider + val colferSpan = new ColferSpan() + colferSpan.unmarshal(carrier.array(), 0) + + val spanContext = SpanContext( + traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), + spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID), + parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID), + samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) + ) + + context.withKey(Span.ContextKey, Span.Remote(spanContext)) + } + } + + + private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { + case SamplingDecision.Sample => 1 + case SamplingDecision.DoNotSample => 2 + case SamplingDecision.Unknown => 3 + } + + private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { + case 1 => SamplingDecision.Sample + case 2 => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + } + + object Colfer { + private val codecBuffer = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) + } + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index f2f1918c..5f61f3aa 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,7 +16,8 @@ package kamon.trace import com.typesafe.config.Config -import kamon.{Kamon, ReporterRegistryImpl} +import kamon.ReporterRegistry.SpanSink +import kamon.Kamon import kamon.metric.MetricLookup import kamon.trace.Span.TagValue import kamon.trace.SpanContext.SamplingDecision @@ -34,7 +35,7 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @@ -46,7 +47,7 @@ object Tracer { reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, reporterRegistry) + new SpanBuilder(operationName, this, spanSink) override def identityProvider: IdentityProvider = this._identityProvider @@ -84,11 +85,11 @@ object Tracer { } object Default { - def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = - new Default(metrics, reporterRegistry, initialConfig) + def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default = + new Default(metrics, spanSink, initialConfig) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) { private var parentSpan: Span = _ private var startTimestamp = 0L private var initialSpanTags = Map.empty[String, Span.TagValue] @@ -157,7 +158,7 @@ object Tracer { } tracer.tracerMetrics.createdSpans.increment() - Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics) + Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = |