diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-jdbc/src | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-jdbc/src')
4 files changed, 110 insertions, 215 deletions
diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf index 7c905f0b..e697d58c 100644 --- a/kamon-jdbc/src/main/resources/reference.conf +++ b/kamon-jdbc/src/main/resources/reference.conf @@ -15,15 +15,4 @@ kamon { # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator that will be used for assigning names to segments. name-generator = kamon.jdbc.DefaultJdbcNameGenerator } - - metrics { - precision { - jdbc { - statements { - reads = ${kamon.metrics.precision.default-histogram-precision} - writes = ${kamon.metrics.precision.default-histogram-precision} - } - } - } - } }
\ No newline at end of file diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala index b2f35c6c..386cf019 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -17,13 +17,10 @@ package kamon.jdbc.instrumentation import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } -import akka.actor.ActorSystem -import kamon.Kamon -import kamon.jdbc.Jdbc +import kamon.jdbc.{ JdbcExtension, Jdbc } import kamon.jdbc.metric.StatementsMetrics -import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder import kamon.metric.Metrics -import kamon.trace.{ TraceContext, SegmentCategory, TraceRecorder } +import kamon.trace.{ TraceContext, SegmentCategory } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import org.slf4j.LoggerFactory @@ -46,15 +43,16 @@ class StatementInstrumentation { @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)") def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { - TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - - implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) + TraceContext.map { ctx ⇒ + val metricsExtension = ctx.lookupExtension(Metrics) + val jdbcExtension = ctx.lookupExtension(Jdbc) + implicit val statementRecorder = metricsExtension.register(StatementsMetrics, "jdbc-statements").map(_.recorder) sql.replaceAll(CommentPattern, Empty) match { - case SelectStatement(_) ⇒ withSegment(ctx, system, Select)(recordRead(pjp, sql, system)) - case InsertStatement(_) ⇒ withSegment(ctx, system, Insert)(recordWrite(pjp, sql, system)) - case UpdateStatement(_) ⇒ withSegment(ctx, system, Update)(recordWrite(pjp, sql, system)) - case DeleteStatement(_) ⇒ withSegment(ctx, system, Delete)(recordWrite(pjp, sql, system)) + case SelectStatement(_) ⇒ withSegment(ctx, Select, jdbcExtension)(recordRead(pjp, sql, jdbcExtension)) + case InsertStatement(_) ⇒ withSegment(ctx, Insert, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) + case UpdateStatement(_) ⇒ withSegment(ctx, Update, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) + case DeleteStatement(_) ⇒ withSegment(ctx, Delete, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) case anythingElse ⇒ log.debug(s"Unable to parse sql [$sql]") pjp.proceed() @@ -67,27 +65,27 @@ class StatementInstrumentation { try thunk finally timeSpent(System.nanoTime() - start) } - def withSegment[A](ctx: TraceContext, system: ActorSystem, statement: String)(thunk: ⇒ A): A = { - val segmentName = Jdbc(system).generateJdbcSegmentName(statement) + def withSegment[A](ctx: TraceContext, statement: String, jdbcExtension: JdbcExtension)(thunk: ⇒ A): A = { + val segmentName = jdbcExtension.generateJdbcSegmentName(statement) val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName) try thunk finally segment.finish() } - def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { - withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ + def recordRead(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) val timeSpentInMillis = nanos.toMillis(timeSpent) - if (timeSpentInMillis >= Jdbc(system).slowQueryThreshold) { - statementRecorder.map(stmr ⇒ stmr.slow.increment()) - Jdbc(system).processSlowQuery(sql, timeSpentInMillis) + if (timeSpentInMillis >= jdbcExtension.slowQueryThreshold) { + statementRecorder.map(stmr ⇒ stmr.slows.increment()) + jdbcExtension.processSlowQuery(sql, timeSpentInMillis) } } } - def recordWrite(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { - withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ + def recordWrite(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) } } @@ -109,12 +107,12 @@ object StatementInstrumentation { val Delete = "Delete" implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) { - def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + def proceedWithErrorHandler(sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { try { pjp.proceed() } catch { case NonFatal(cause) ⇒ - Jdbc(system).processSqlError(sql, cause) + jdbcExtension.processSqlError(sql, cause) statementRecorder.map(stmr ⇒ stmr.errors.increment()) throw cause } diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala index 7ba8b105..e1d6689c 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala @@ -16,68 +16,17 @@ package kamon.jdbc.metric -import akka.actor.ActorSystem -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, MetricGroupCategory, MetricGroupFactory, MetricGroupIdentity, MetricGroupRecorder, MetricGroupSnapshot, MetricIdentity, MetricSnapshot } -import kamon.metric.instrument.{ Counter, Histogram } - -case class StatementsMetrics(name: String) extends MetricGroupIdentity { - val category = StatementsMetrics -} - -object StatementsMetrics extends MetricGroupCategory { - val name = "jdbc-statements" - - case object Writes extends MetricIdentity { val name = "writes" } - case object Reads extends MetricIdentity { val name = "reads" } - case object Slows extends MetricIdentity { val name = "slow-queries" } - case object Errors extends MetricIdentity { val name = "errors" } - - case class StatementsMetricsRecorder(writes: Histogram, reads: Histogram, slow: Counter, errors: Counter) - extends MetricGroupRecorder { - - def collect(context: CollectionContext): MetricGroupSnapshot = { - StatementsMetricsSnapshot(writes.collect(context), reads.collect(context), slow.collect(context), errors.collect(context)) - } - - def cleanup: Unit = {} - } - - case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slows: Counter.Snapshot, errors: Counter.Snapshot) - extends MetricGroupSnapshot { - - type GroupSnapshotType = StatementsMetricsSnapshot - - def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = { - StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slows.merge(that.slows, context), errors.merge(that.errors, context)) - } - - lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( - Writes -> writes, - Reads -> reads, - Slows -> slows, - Reads -> errors) - } - - val Factory = StatementsMetricsGroupFactory -} - -case object StatementsMetricsGroupFactory extends MetricGroupFactory { - import kamon.jdbc.metric.StatementsMetrics._ - - type GroupRecorder = StatementsMetricsRecorder - - def create(config: Config, system: ActorSystem): GroupRecorder = { - val settings = config.getConfig("precision.jdbc.statements") - - val writesConfig = settings.getConfig("writes") - val readsConfig = settings.getConfig("reads") - - new StatementsMetricsRecorder( - Histogram.fromConfig(writesConfig), - Histogram.fromConfig(readsConfig), - Counter(), - Counter()) - } +import kamon.metric._ +import kamon.metric.instrument.{ Time, InstrumentFactory } + +class StatementsMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val reads = histogram("reads", Time.Nanoseconds) + val writes = histogram("writes", Time.Nanoseconds) + val slows = counter("slows") + val errors = counter("errors") } +object StatementsMetrics extends EntityRecorderFactory[StatementsMetrics] { + def category: String = "jdbc-statements" + def createRecorder(instrumentFactory: InstrumentFactory): StatementsMetrics = new StatementsMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala index 534edd57..e150d967 100644 --- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala +++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala @@ -17,40 +17,33 @@ package kamon.jdbc.instrumentation import java.sql.{ DriverManager, SQLException } -import akka.actor.ActorSystem -import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory -import kamon.Kamon import kamon.jdbc.{ Jdbc, JdbcNameGenerator, SqlErrorProcessor, SlowQueryProcessor } -import kamon.jdbc.metric.StatementsMetrics -import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot -import kamon.metric.{ TraceMetrics, Metrics } -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { - - implicit lazy val system: ActorSystem = ActorSystem("jdbc-spec", ConfigFactory.parseString( - """ - |kamon { - | jdbc { - | slow-query-threshold = 100 milliseconds - | - | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. - | slow-query-processor = kamon.jdbc.instrumentation.NoOpSlowQueryProcessor - | - | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. - | sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor - | - | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator - | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator - | } - |} - """.stripMargin)) +import kamon.metric.TraceMetricsSpec +import kamon.testkit.BaseKamonSpec +import kamon.trace.{ SegmentCategory, TraceContext } + +class StatementInstrumentationSpec extends BaseKamonSpec("jdbc-spec") { + import TraceMetricsSpec.SegmentSyntax + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | jdbc { + | slow-query-threshold = 100 milliseconds + | + | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. + | slow-query-processor = kamon.jdbc.instrumentation.NoOpSlowQueryProcessor + | + | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. + | sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor + | + | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator + | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator + | } + |} + """.stripMargin) val connection = DriverManager.getConnection("jdbc:h2:mem:jdbc-spec", "SA", "") @@ -67,117 +60,105 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma } "the StatementInstrumentation" should { - "record the execution time of INSERT operation" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-insert") { - - val metricsListener = subscribeToMetrics() - + "record the execution time of INSERT operation" in { + TraceContext.withContext(newContext("jdbc-trace-insert")) { for (id ← 1 to 100) { val insert = s"INSERT INTO Address (Nr, Name) VALUES($id, 'foo')" val insertStatement = connection.prepareStatement(insert) insertStatement.execute() } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.writes.numberOfMeasurements should be(100) - - TraceRecorder.finish() + TraceContext.currentContext.finish() } - val snapshot = takeSnapshotOf("jdbc-trace-insert") - snapshot.elapsedTime.numberOfMeasurements should be(1) - snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) - } + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100) - "record the execution time of SELECT operation" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-select") { - - val metricsListener = subscribeToMetrics() + val traceSnapshot = takeSnapshotOf("jdbc-trace-insert", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceSnapshot.segments.size should be(1) + traceSnapshot.segment("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100) + } + "record the execution time of SELECT operation" in { + TraceContext.withContext(newContext("jdbc-trace-select")) { for (id ← 1 to 100) { val select = s"SELECT * FROM Address where Nr = $id" val selectStatement = connection.createStatement() selectStatement.execute(select) } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.reads.numberOfMeasurements should be(100) - - TraceRecorder.finish() + TraceContext.currentContext.finish() } - val snapshot = takeSnapshotOf("jdbc-trace-select") - snapshot.elapsedTime.numberOfMeasurements should be(1) - snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) - } - - "record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-update") { + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.histogram("reads").get.numberOfMeasurements should be(100) - val metricsListener = subscribeToMetrics() + val traceSnapshot = takeSnapshotOf("jdbc-trace-select", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceSnapshot.segments.size should be(1) + traceSnapshot.segment("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100) + } + "record the execution time of UPDATE operation" in { + TraceContext.withContext(newContext("jdbc-trace-update")) { for (id ← 1 to 100) { val update = s"UPDATE Address SET Name = 'bar$id' where Nr = $id" val updateStatement = connection.prepareStatement(update) updateStatement.execute() } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.writes.numberOfMeasurements should be(100) - TraceRecorder.finish() + TraceContext.currentContext.finish() } - val snapshot = takeSnapshotOf("jdbc-trace-update") - snapshot.elapsedTime.numberOfMeasurements should be(1) - snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) - } - - "record the execution time of DELETE operation" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-delete") { + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100) - val metricsListener = subscribeToMetrics() + val traceSnapshot = takeSnapshotOf("jdbc-trace-update", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceSnapshot.segments.size should be(1) + traceSnapshot.segment("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100) + } + "record the execution time of DELETE operation" in { + TraceContext.withContext(newContext("jdbc-trace-delete")) { for (id ← 1 to 100) { val delete = s"DELETE FROM Address where Nr = $id" val deleteStatement = connection.createStatement() deleteStatement.execute(delete) } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.writes.numberOfMeasurements should be(100) - TraceRecorder.finish() + TraceContext.currentContext.finish() } - val snapshot = takeSnapshotOf("jdbc-trace-delete") - snapshot.elapsedTime.numberOfMeasurements should be(1) - snapshot.segments.size should be(1) - snapshot.segments(SegmentMetricIdentity("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) - } + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.histogram("writes").get.numberOfMeasurements should be(100) - "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-slow") { + val traceSnapshot = takeSnapshotOf("jdbc-trace-delete", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + traceSnapshot.segments.size should be(1) + traceSnapshot.segment("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName).numberOfMeasurements should be(100) - val metricsListener = subscribeToMetrics() + } + "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in { + TraceContext.withContext(newContext("jdbc-trace-slow")) { for (id ← 1 to 2) { val select = s"SELECT * FROM Address; CALL SLEEP(100)" val selectStatement = connection.createStatement() selectStatement.execute(select) } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.slows.count should be(2) + TraceContext.currentContext.finish() } - } - "count all SQL ERRORS" in new StatementsMetricsListenerFixture { - TraceRecorder.withNewTraceContext("jdbc-trace-errors") { + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.counter("slows").get.count should be(2) - val metricsListener = subscribeToMetrics() + } + "count all SQL ERRORS" in { + TraceContext.withContext(newContext("jdbc-trace-errors")) { for (_ ← 1 to 10) { intercept[SQLException] { val error = "SELECT * FROM NO_EXISTENT_TABLE" @@ -185,35 +166,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma errorStatement.execute(error) } } - val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) - StatementMetrics.errors.count should be(10) - } - } - } - trait StatementsMetricsListenerFixture { - def subscribeToMetrics(): TestProbe = { - val metricsListener = TestProbe() - Kamon(Metrics).subscribe(StatementsMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - metricsListener - } - } + TraceContext.currentContext.finish() + } - def expectStatementsMetrics(listener: TestProbe, waitTime: FiniteDuration): StatementsMetricsSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] + val jdbcSnapshot = takeSnapshotOf("jdbc-statements", "jdbc-statements") + jdbcSnapshot.counter("errors").get.count should be(10) } - val statementsMetricsOption = tickSnapshot.metrics.get(StatementsMetrics(StatementInstrumentation.Statements)) - statementsMetricsOption should not be empty - statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot] - } - - def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { - val recorder = Kamon(Metrics)(system).register(TraceMetrics(traceName), TraceMetrics.Factory) - val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext - recorder.get.collect(collectionContext) } } |