aboutsummaryrefslogtreecommitdiff
path: root/kamon-jdbc
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit01a34f67ff75419c440f2e69c0a0db888a670a34 (patch)
tree9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-jdbc
parent4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff)
downloadKamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-jdbc')
-rw-r--r--kamon-jdbc/src/main/resources/reference.conf11
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala44
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala75
-rw-r--r--kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala195
4 files changed, 110 insertions, 215 deletions
diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf
index 7c905f0b..e697d58c 100644
--- a/kamon-jdbc/src/main/resources/reference.conf
+++ b/kamon-jdbc/src/main/resources/reference.conf
@@ -15,15 +15,4 @@ kamon {
# Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator that will be used for assigning names to segments.
name-generator = kamon.jdbc.DefaultJdbcNameGenerator
}
-
- metrics {
- precision {
- jdbc {
- statements {
- reads = ${kamon.metrics.precision.default-histogram-precision}
- writes = ${kamon.metrics.precision.default-histogram-precision}
- }
- }
- }
- }
} \ No newline at end of file
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
index b2f35c6c..386cf019 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -17,13 +17,10 @@ package kamon.jdbc.instrumentation
import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos }
-import akka.actor.ActorSystem
-import kamon.Kamon
-import kamon.jdbc.Jdbc
+import kamon.jdbc.{ JdbcExtension, Jdbc }
import kamon.jdbc.metric.StatementsMetrics
-import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder
import kamon.metric.Metrics
-import kamon.trace.{ TraceContext, SegmentCategory, TraceRecorder }
+import kamon.trace.{ TraceContext, SegmentCategory }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
import org.slf4j.LoggerFactory
@@ -46,15 +43,16 @@ class StatementInstrumentation {
@Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)")
def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = {
- TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
-
- implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory)
+ TraceContext.map { ctx ⇒
+ val metricsExtension = ctx.lookupExtension(Metrics)
+ val jdbcExtension = ctx.lookupExtension(Jdbc)
+ implicit val statementRecorder = metricsExtension.register(StatementsMetrics, "jdbc-statements").map(_.recorder)
sql.replaceAll(CommentPattern, Empty) match {
- case SelectStatement(_) ⇒ withSegment(ctx, system, Select)(recordRead(pjp, sql, system))
- case InsertStatement(_) ⇒ withSegment(ctx, system, Insert)(recordWrite(pjp, sql, system))
- case UpdateStatement(_) ⇒ withSegment(ctx, system, Update)(recordWrite(pjp, sql, system))
- case DeleteStatement(_) ⇒ withSegment(ctx, system, Delete)(recordWrite(pjp, sql, system))
+ case SelectStatement(_) ⇒ withSegment(ctx, Select, jdbcExtension)(recordRead(pjp, sql, jdbcExtension))
+ case InsertStatement(_) ⇒ withSegment(ctx, Insert, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
+ case UpdateStatement(_) ⇒ withSegment(ctx, Update, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
+ case DeleteStatement(_) ⇒ withSegment(ctx, Delete, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
case anythingElse ⇒
log.debug(s"Unable to parse sql [$sql]")
pjp.proceed()
@@ -67,27 +65,27 @@ class StatementInstrumentation {
try thunk finally timeSpent(System.nanoTime() - start)
}
- def withSegment[A](ctx: TraceContext, system: ActorSystem, statement: String)(thunk: ⇒ A): A = {
- val segmentName = Jdbc(system).generateJdbcSegmentName(statement)
+ def withSegment[A](ctx: TraceContext, statement: String, jdbcExtension: JdbcExtension)(thunk: ⇒ A): A = {
+ val segmentName = jdbcExtension.generateJdbcSegmentName(statement)
val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName)
try thunk finally segment.finish()
}
- def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = {
- withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒
+ def recordRead(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
+ withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒
statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
val timeSpentInMillis = nanos.toMillis(timeSpent)
- if (timeSpentInMillis >= Jdbc(system).slowQueryThreshold) {
- statementRecorder.map(stmr ⇒ stmr.slow.increment())
- Jdbc(system).processSlowQuery(sql, timeSpentInMillis)
+ if (timeSpentInMillis >= jdbcExtension.slowQueryThreshold) {
+ statementRecorder.map(stmr ⇒ stmr.slows.increment())
+ jdbcExtension.processSlowQuery(sql, timeSpentInMillis)
}
}
}
- def recordWrite(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = {
- withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒
+ def recordWrite(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
+ withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒
statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
}
}
@@ -109,12 +107,12 @@ object StatementInstrumentation {
val Delete = "Delete"
implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) {
- def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = {
+ def proceedWithErrorHandler(sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
try {
pjp.proceed()
} catch {
case NonFatal(cause) ⇒
- Jdbc(system).processSqlError(sql, cause)
+ jdbcExtension.processSqlError(sql, cause)
statementRecorder.map(stmr ⇒ stmr.errors.increment())
throw cause
}
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
index 7ba8b105..e1d6689c 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
@@ -16,68 +16,17 @@
package kamon.jdbc.metric
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import kamon.metric.{ CollectionContext, MetricGroupCategory, MetricGroupFactory, MetricGroupIdentity, MetricGroupRecorder, MetricGroupSnapshot, MetricIdentity, MetricSnapshot }
-import kamon.metric.instrument.{ Counter, Histogram }
-
-case class StatementsMetrics(name: String) extends MetricGroupIdentity {
- val category = StatementsMetrics
-}
-
-object StatementsMetrics extends MetricGroupCategory {
- val name = "jdbc-statements"
-
- case object Writes extends MetricIdentity { val name = "writes" }
- case object Reads extends MetricIdentity { val name = "reads" }
- case object Slows extends MetricIdentity { val name = "slow-queries" }
- case object Errors extends MetricIdentity { val name = "errors" }
-
- case class StatementsMetricsRecorder(writes: Histogram, reads: Histogram, slow: Counter, errors: Counter)
- extends MetricGroupRecorder {
-
- def collect(context: CollectionContext): MetricGroupSnapshot = {
- StatementsMetricsSnapshot(writes.collect(context), reads.collect(context), slow.collect(context), errors.collect(context))
- }
-
- def cleanup: Unit = {}
- }
-
- case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slows: Counter.Snapshot, errors: Counter.Snapshot)
- extends MetricGroupSnapshot {
-
- type GroupSnapshotType = StatementsMetricsSnapshot
-
- def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = {
- StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slows.merge(that.slows, context), errors.merge(that.errors, context))
- }
-
- lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
- Writes -> writes,
- Reads -> reads,
- Slows -> slows,
- Reads -> errors)
- }
-
- val Factory = StatementsMetricsGroupFactory
-}
-
-case object StatementsMetricsGroupFactory extends MetricGroupFactory {
- import kamon.jdbc.metric.StatementsMetrics._
-
- type GroupRecorder = StatementsMetricsRecorder
-
- def create(config: Config, system: ActorSystem): GroupRecorder = {
- val settings = config.getConfig("precision.jdbc.statements")
-
- val writesConfig = settings.getConfig("writes")
- val readsConfig = settings.getConfig("reads")
-
- new StatementsMetricsRecorder(
- Histogram.fromConfig(writesConfig),
- Histogram.fromConfig(readsConfig),
- Counter(),
- Counter())
- }
+import kamon.metric._
+import kamon.metric.instrument.{ Time, InstrumentFactory }
+
+class StatementsMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val reads = histogram("reads", Time.Nanoseconds)
+ val writes = histogram("writes", Time.Nanoseconds)
+ val slows = counter("slows")
+ val errors = counter("errors")
}
+object StatementsMetrics extends EntityRecorderFactory[StatementsMetrics] {
+ def category: String = "jdbc-statements"
+ def createRecorder(instrumentFactory: InstrumentFactory): StatementsMetrics = new StatementsMetrics(instrumentFactory)
+} \ No newline at end of file
diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
index 534edd57..e150d967 100644
--- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
+++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
@@ -17,40 +17,33 @@ package kamon.jdbc.instrumentation
import java.sql.{ DriverManager, SQLException }
-import akka.actor.ActorSystem
-import akka.testkit.{ TestKitBase, TestProbe }
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
import kamon.jdbc.{ Jdbc, JdbcNameGenerator, SqlErrorProcessor, SlowQueryProcessor }
-import kamon.jdbc.metric.StatementsMetrics
-import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot
-import kamon.metric.{ TraceMetrics, Metrics }
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.metric.TraceMetrics.TraceMetricsSnapshot
-import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder }
-import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
-
-import scala.concurrent.duration._
-
-class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll {
-
- implicit lazy val system: ActorSystem = ActorSystem("jdbc-spec", ConfigFactory.parseString(
- """
- |kamon {
- | jdbc {
- | slow-query-threshold = 100 milliseconds
- |
- | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor.
- | slow-query-processor = kamon.jdbc.instrumentation.NoOpSlowQueryProcessor
- |
- | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor.
- | sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor
- |
- | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator
- | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator
- | }
- |}
- """.stripMargin))
+import kamon.metric.TraceMetricsSpec
+import kamon.testkit.BaseKamonSpec
+import kamon.trace.{ SegmentCategory, TraceContext }
+
+class StatementInstrumentationSpec extends BaseKamonSpec("jdbc-spec") {
+ import TraceMetricsSpec.SegmentSyntax
+
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | jdbc {
+ | slow-query-threshold = 100 milliseconds
+ |
+ | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor.
+ | slow-query-processor = kamon.jdbc.instrumentation.NoOpSlowQueryProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor.
+ | sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator
+ | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator
+ | }
+ |}
+ """.stripMargin)
val connection = DriverManager.getConnection("jdbc:h2:mem:jdbc-spec", "SA", "")
@@ -67,117 +60,105 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
}
"the StatementInstrumentation" should {
- "record the execution time of INSERT operation" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-insert") {
-
- val metricsListener = subscribeToMetrics()
-
+ "record the execution time of INSERT operation" in {
+ TraceContext.withContext(newContext("jdbc-trace-insert")) {
for (id ← 1 to 100) {
val insert = s"INSERT INTO Address (Nr, Name) VALUES($id, 'foo')"
val insertStatement = connection.prepareStatement(insert)
insertStatement.execute()
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.writes.numberOfMeasurements should be(100)
-
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
}
- val snapshot = takeSnapshotOf("jdbc-trace-insert")
- snapshot.elapsedTime.numberOfMeasurements should be(1)
- snapshot.segments.size should be(1)
- snapshot.segments(SegmentMetricIdentity("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
- }
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
- "record the execution time of SELECT operation" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-select") {
-
- val metricsListener = subscribeToMetrics()
+ val traceSnapshot = takeSnapshotOf("jdbc-trace-insert", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceSnapshot.segments.size should be(1)
+ traceSnapshot.segment("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100)
+ }
+ "record the execution time of SELECT operation" in {
+ TraceContext.withContext(newContext("jdbc-trace-select")) {
for (id ← 1 to 100) {
val select = s"SELECT * FROM Address where Nr = $id"
val selectStatement = connection.createStatement()
selectStatement.execute(select)
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.reads.numberOfMeasurements should be(100)
-
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
}
- val snapshot = takeSnapshotOf("jdbc-trace-select")
- snapshot.elapsedTime.numberOfMeasurements should be(1)
- snapshot.segments.size should be(1)
- snapshot.segments(SegmentMetricIdentity("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
- }
-
- "record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-update") {
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.histogram("reads").get.numberOfMeasurements should be(100)
- val metricsListener = subscribeToMetrics()
+ val traceSnapshot = takeSnapshotOf("jdbc-trace-select", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceSnapshot.segments.size should be(1)
+ traceSnapshot.segment("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100)
+ }
+ "record the execution time of UPDATE operation" in {
+ TraceContext.withContext(newContext("jdbc-trace-update")) {
for (id ← 1 to 100) {
val update = s"UPDATE Address SET Name = 'bar$id' where Nr = $id"
val updateStatement = connection.prepareStatement(update)
updateStatement.execute()
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.writes.numberOfMeasurements should be(100)
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
}
- val snapshot = takeSnapshotOf("jdbc-trace-update")
- snapshot.elapsedTime.numberOfMeasurements should be(1)
- snapshot.segments.size should be(1)
- snapshot.segments(SegmentMetricIdentity("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
- }
-
- "record the execution time of DELETE operation" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-delete") {
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
- val metricsListener = subscribeToMetrics()
+ val traceSnapshot = takeSnapshotOf("jdbc-trace-update", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceSnapshot.segments.size should be(1)
+ traceSnapshot.segment("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100)
+ }
+ "record the execution time of DELETE operation" in {
+ TraceContext.withContext(newContext("jdbc-trace-delete")) {
for (id ← 1 to 100) {
val delete = s"DELETE FROM Address where Nr = $id"
val deleteStatement = connection.createStatement()
deleteStatement.execute(delete)
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.writes.numberOfMeasurements should be(100)
- TraceRecorder.finish()
+ TraceContext.currentContext.finish()
}
- val snapshot = takeSnapshotOf("jdbc-trace-delete")
- snapshot.elapsedTime.numberOfMeasurements should be(1)
- snapshot.segments.size should be(1)
- snapshot.segments(SegmentMetricIdentity("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
- }
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
- "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-slow") {
+ val traceSnapshot = takeSnapshotOf("jdbc-trace-delete", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+ traceSnapshot.segments.size should be(1)
+ traceSnapshot.segment("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100)
- val metricsListener = subscribeToMetrics()
+ }
+ "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in {
+ TraceContext.withContext(newContext("jdbc-trace-slow")) {
for (id ← 1 to 2) {
val select = s"SELECT * FROM Address; CALL SLEEP(100)"
val selectStatement = connection.createStatement()
selectStatement.execute(select)
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.slows.count should be(2)
+ TraceContext.currentContext.finish()
}
- }
- "count all SQL ERRORS" in new StatementsMetricsListenerFixture {
- TraceRecorder.withNewTraceContext("jdbc-trace-errors") {
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.counter("slows").get.count should be(2)
- val metricsListener = subscribeToMetrics()
+ }
+ "count all SQL ERRORS" in {
+ TraceContext.withContext(newContext("jdbc-trace-errors")) {
for (_ ← 1 to 10) {
intercept[SQLException] {
val error = "SELECT * FROM NO_EXISTENT_TABLE"
@@ -185,35 +166,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
errorStatement.execute(error)
}
}
- val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
- StatementMetrics.errors.count should be(10)
- }
- }
- }
- trait StatementsMetricsListenerFixture {
- def subscribeToMetrics(): TestProbe = {
- val metricsListener = TestProbe()
- Kamon(Metrics).subscribe(StatementsMetrics, "*", metricsListener.ref, permanently = true)
- // Wait for one empty snapshot before proceeding to the test.
- metricsListener.expectMsgType[TickMetricSnapshot]
- metricsListener
- }
- }
+ TraceContext.currentContext.finish()
+ }
- def expectStatementsMetrics(listener: TestProbe, waitTime: FiniteDuration): StatementsMetricsSnapshot = {
- val tickSnapshot = within(waitTime) {
- listener.expectMsgType[TickMetricSnapshot]
+ val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements")
+ jdbcSnapshot.counter("errors").get.count should be(10)
}
- val statementsMetricsOption = tickSnapshot.metrics.get(StatementsMetrics(StatementInstrumentation.Statements))
- statementsMetricsOption should not be empty
- statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot]
- }
-
- def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
- val recorder = Kamon(Metrics)(system).register(TraceMetrics(traceName), TraceMetrics.Factory)
- val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext
- recorder.get.collect(collectionContext)
}
}