aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-05-18 16:21:44 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-05-18 16:21:44 +0200
commit77f2666650726352a9e15dcf6019064d91393b2e (patch)
treec39f7e2a18ac6bb1fcd1e2cc73dd3c165919515e /kamon-core/src/main/scala/kamon/ReporterRegistry.scala
parent5dee54a0794b282e9b5729a3d4b85478c12a68d1 (diff)
downloadKamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.gz
Kamon-77f2666650726352a9e15dcf6019064d91393b2e.tar.bz2
Kamon-77f2666650726352a9e15dcf6019064d91393b2e.zip
some more wip
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala105
1 files changed, 40 insertions, 65 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 09c980f6..b42c5abe 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -6,6 +6,7 @@ import java.util.concurrent._
import com.typesafe.config.Config
import kamon.metric._
+import kamon.trace.Span
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -14,15 +15,40 @@ import scala.util.control.NonFatal
trait ReporterRegistry {
def loadFromConfig(): Unit
+
def add(reporter: MetricsReporter): Registration
def add(reporter: MetricsReporter, name: String): Registration
+ def add(reporter: SpansReporter): Registration
+
def stopAll(): Future[Unit]
}
+
+trait Registration {
+ def cancel(): Boolean
+}
+
+trait MetricsReporter {
+ def start(config: Config): Unit
+ def reconfigure(config: Config): Unit
+ def stop(): Unit
+
+ def reportTickSnapshot(snapshot: TickSnapshot)
+}
+
+trait SpansReporter {
+ def start(config: Config): Unit
+ def reconfigure(config: Config): Unit
+ def stop(): Unit
+
+ def reportSpan(span: Span.CompletedSpan): Unit
+}
+
class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) extends ReporterRegistry {
private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry"))
private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]()
+ private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]()
private val reporterCounter = new AtomicLong(0L)
reconfigure(initialConfig)
@@ -55,6 +81,14 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config)
}
}
+ override def add(reporter: SpansReporter): Registration = {
+ spanReporters.add(reporter)
+
+ new Registration {
+ override def cancel(): Boolean = true
+ }
+ }
+
override def stopAll(): Future[Unit] = {
implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
val reporterStopFutures = Vector.newBuilder[Future[Unit]]
@@ -87,6 +121,11 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config)
}
}
+
+ private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
+ spanReporters.forEach(_.reportSpan(span))
+ }
+
private def stopReporter(entry: ReporterEntry): Future[Unit] = {
entry.isActive = false
@@ -131,7 +170,7 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config)
reporterEntries.forEach { entry =>
Future {
if(entry.isActive)
- entry.reporter.processTick(tickSnapshot)
+ entry.reporter.reportTickSnapshot(tickSnapshot)
}(executor = entry.executionContext)
}
@@ -142,68 +181,4 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config)
case NonFatal(t) => logger.error("Error while running a tick", t)
}
}
-}
-
-
-
-trait Registration {
- def cancel(): Boolean
-}
-
-trait MetricsReporter {
- def reconfigure(config: Config): Unit
-
- def start(config: Config): Unit
- def stop(): Unit
-
- def processTick(snapshot: TickSnapshot)
-}
-
-
-
-object TestingAllExample extends App {
- val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty))
-
- val registration = Kamon.reporters.add(new DummyReporter("test"))
-
- var x = 0
- while(true) {
- recorder.counter("test-other").increment()
- Thread.sleep(100)
- x += 1
-
- if(x == 50) {
- registration.cancel()
- }
-
- if(x == 100) {
- println("Stopping all reporters")
- Kamon.reporters.stopAll()
- }
- }
-
-}
-
-
-class DummyReporter(name: String) extends MetricsReporter {
- override def reconfigure(config: Config): Unit = {
- println("NAME: " + name + "===> Reconfiguring Dummy")
- }
-
- override def start(config: Config): Unit = {
-
- println("NAME: " + name + "===> Starting DUMMY")
- }
-
- override def stop(): Unit = {
- println("NAME: " + name + "===> Stopping Dummy")
- }
-
- override def processTick(snapshot: TickSnapshot): Unit = {
- println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot)
- println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}")
- snapshot.entities.foreach { e =>
- println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", "))
- }
- }
} \ No newline at end of file