aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-06-06 14:15:15 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-06-06 14:15:15 +0200
commitc52f8eaca0d1ccc4c992cba039e35e099b5b478b (patch)
treef9e78e2f929627e7547bef39fdf6cbcd544cb8d8 /kamon-core/src/main/scala/kamon/ReporterRegistry.scala
parent1f5d9876dedb715ae1c31203ea4f15ebf031612c (diff)
downloadKamon-c52f8eaca0d1ccc4c992cba039e35e099b5b478b.tar.gz
Kamon-c52f8eaca0d1ccc4c992cba039e35e099b5b478b.tar.bz2
Kamon-c52f8eaca0d1ccc4c992cba039e35e099b5b478b.zip
make it compile for Scala 2.11 and 2.12
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala38
1 files changed, 18 insertions, 20 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 3b59d8b7..a22162eb 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -1,6 +1,5 @@
package kamon
-import java.time.Instant
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
@@ -12,6 +11,7 @@ import kamon.trace.Span
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
trait ReporterRegistry {
def loadFromConfig(): Unit
@@ -60,7 +60,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
override def add(reporter: MetricsReporter, name: String): Registration = {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = ReporterEntry(
+ val reporterEntry = new ReporterEntry(
id = reporterCounter.getAndIncrement(),
reporter = reporter,
executionContext = ExecutionContext.fromExecutorService(executor)
@@ -71,12 +71,10 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
new Registration {
val reporterID = reporterEntry.id
override def cancel(): Boolean = {
- metricReporters.removeIf(entry => {
- if(entry.id == reporterID) {
- stopReporter(entry)
- true
- } else false
- })
+ metricReporters.iterator().asScala
+ .find(e => e.id == reporterID)
+ .map(e => stopReporter(e))
+ .isDefined
}
}
}
@@ -99,31 +97,31 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}
}
- Future.sequence(reporterStopFutures.result()).transform(_ => Try((): Unit))
+ Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
}
private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val tickInterval = config.getDuration("kamon.metric.tick-interval")
+ val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS)
val currentTicker = metricsTickerSchedule.get()
if(currentTicker != null) {
currentTicker.cancel(true)
}
// Reconfigure all registered reporters
- metricReporters.forEach(entry =>
+ metricReporters.iterator().asScala.foreach(entry =>
Future(entry.reporter.reconfigure(config))(entry.executionContext)
)
metricsTickerSchedule.set {
registryExecutionContext.scheduleAtFixedRate(
- new MetricTicker(metrics, metricReporters), tickInterval.toMillis, tickInterval.toMillis, TimeUnit.MILLISECONDS
+ new MetricTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
)
}
}
private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
- spanReporters.forEach(_.reportSpan(span))
+ spanReporters.iterator().asScala.foreach(_.reportSpan(span))
}
private def stopReporter(entry: ReporterEntry): Future[Unit] = {
@@ -134,25 +132,25 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
- private case class ReporterEntry(
+ private class ReporterEntry(
@volatile var isActive: Boolean = true,
- id: Long,
- reporter: MetricsReporter,
- executionContext: ExecutionContextExecutorService
+ val id: Long,
+ val reporter: MetricsReporter,
+ val executionContext: ExecutionContextExecutorService
)
private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable {
val logger = Logger(classOf[MetricTicker])
- var lastTick = Instant.now()
+ var lastTick = System.currentTimeMillis()
def run(): Unit = try {
- val currentTick = Instant.now()
+ val currentTick = System.currentTimeMillis()
val tickSnapshot = TickSnapshot(
interval = Interval(lastTick, currentTick),
metrics = snapshotGenerator.snapshot()
)
- reporterEntries.forEach { entry =>
+ reporterEntries.iterator().asScala.foreach { entry =>
Future {
if(entry.isActive)
entry.reporter.reportTickSnapshot(tickSnapshot)