From 5413b1144610c932b728af79f9224df48c1c5447 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 4 Dec 2014 02:05:33 -0300 Subject: + kamon-jdbc: introduce Jdbc segments --- kamon-jdbc/src/main/resources/reference.conf | 3 ++ kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala | 14 +++++++ .../instrumentation/StatementInstrumentation.scala | 18 +++++++-- .../StatementInstrumentationSpec.scala | 45 ++++++++++++++++++++-- 4 files changed, 74 insertions(+), 6 deletions(-) (limited to 'kamon-jdbc') diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf index bbe98127..7c905f0b 100644 --- a/kamon-jdbc/src/main/resources/reference.conf +++ b/kamon-jdbc/src/main/resources/reference.conf @@ -11,6 +11,9 @@ kamon { # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. sql-error-processor = kamon.jdbc.DefaultSqlErrorProcessor + + # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator that will be used for assigning names to segments. + name-generator = kamon.jdbc.DefaultJdbcNameGenerator } metrics { diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala index 9de65821..036789b7 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala @@ -23,11 +23,16 @@ import kamon.Kamon object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Jdbc override def createExtension(system: ExtendedActorSystem): JdbcExtension = new JdbcExtension(system) + + val SegmentLibraryName = "jdbc" } class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension { private val config = system.settings.config.getConfig("kamon.jdbc") + private val nameGeneratorFQN = config.getString("name-generator") + private val nameGenerator: JdbcNameGenerator = system.dynamicAccess.createInstanceFor[JdbcNameGenerator](nameGeneratorFQN, Nil).get + private val slowQueryProcessorClass = config.getString("slow-query-processor") private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get @@ -38,6 +43,7 @@ class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension { def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold) def processSqlError(sql: String, ex: Throwable) = sqlErrorProcessor.process(sql, ex) + def generateJdbcSegmentName(statement: String): String = nameGenerator.generateJdbcSegmentName(statement) } trait SlowQueryProcessor { @@ -48,6 +54,14 @@ trait SqlErrorProcessor { def process(sql: String, ex: Throwable): Unit } +trait JdbcNameGenerator { + def generateJdbcSegmentName(statement: String): String +} + +class DefaultJdbcNameGenerator extends JdbcNameGenerator { + def generateJdbcSegmentName(statement: String): String = s"Jdbc[$statement]" +} + class DefaultSqlErrorProcessor extends SqlErrorProcessor { import org.slf4j.LoggerFactory diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala index 41eef716..b2f35c6c 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -23,7 +23,7 @@ import kamon.jdbc.Jdbc import kamon.jdbc.metric.StatementsMetrics import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder import kamon.metric.Metrics -import kamon.trace.TraceRecorder +import kamon.trace.{ TraceContext, SegmentCategory, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import org.slf4j.LoggerFactory @@ -51,8 +51,10 @@ class StatementInstrumentation { implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) sql.replaceAll(CommentPattern, Empty) match { - case SelectStatement(_) ⇒ recordRead(pjp, sql, system) - case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp, sql, system) + case SelectStatement(_) ⇒ withSegment(ctx, system, Select)(recordRead(pjp, sql, system)) + case InsertStatement(_) ⇒ withSegment(ctx, system, Insert)(recordWrite(pjp, sql, system)) + case UpdateStatement(_) ⇒ withSegment(ctx, system, Update)(recordWrite(pjp, sql, system)) + case DeleteStatement(_) ⇒ withSegment(ctx, system, Delete)(recordWrite(pjp, sql, system)) case anythingElse ⇒ log.debug(s"Unable to parse sql [$sql]") pjp.proceed() @@ -65,6 +67,12 @@ class StatementInstrumentation { try thunk finally timeSpent(System.nanoTime() - start) } + def withSegment[A](ctx: TraceContext, system: ActorSystem, statement: String)(thunk: ⇒ A): A = { + val segmentName = Jdbc(system).generateJdbcSegmentName(statement) + val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName) + try thunk finally segment.finish() + } + def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) @@ -95,6 +103,10 @@ object StatementInstrumentation { val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / val Empty = "" val Statements = "jdbc-statements" + val Select = "Select" + val Insert = "Insert" + val Update = "Update" + val Delete = "Delete" implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) { def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala index 9d3c7124..534edd57 100644 --- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala +++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala @@ -21,12 +21,13 @@ import akka.actor.ActorSystem import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.jdbc.{ SqlErrorProcessor, SlowQueryProcessor } +import kamon.jdbc.{ Jdbc, JdbcNameGenerator, SqlErrorProcessor, SlowQueryProcessor } import kamon.jdbc.metric.StatementsMetrics import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot -import kamon.metric.Metrics +import kamon.metric.{ TraceMetrics, Metrics } import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.trace.TraceRecorder +import kamon.metric.TraceMetrics.TraceMetricsSnapshot +import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder } import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } import scala.concurrent.duration._ @@ -44,6 +45,9 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma | | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. | sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor + | + | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator + | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator | } |} """.stripMargin)) @@ -76,7 +80,14 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) StatementMetrics.writes.numberOfMeasurements should be(100) + + TraceRecorder.finish() } + + val snapshot = takeSnapshotOf("jdbc-trace-insert") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(SegmentMetricIdentity("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) } "record the execution time of SELECT operation" in new StatementsMetricsListenerFixture { @@ -92,7 +103,14 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) StatementMetrics.reads.numberOfMeasurements should be(100) + + TraceRecorder.finish() } + + val snapshot = takeSnapshotOf("jdbc-trace-select") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(SegmentMetricIdentity("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) } "record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture { @@ -108,7 +126,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) StatementMetrics.writes.numberOfMeasurements should be(100) + TraceRecorder.finish() } + + val snapshot = takeSnapshotOf("jdbc-trace-update") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(SegmentMetricIdentity("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) } "record the execution time of DELETE operation" in new StatementsMetricsListenerFixture { @@ -124,7 +148,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) StatementMetrics.writes.numberOfMeasurements should be(100) + TraceRecorder.finish() } + + val snapshot = takeSnapshotOf("jdbc-trace-delete") + snapshot.elapsedTime.numberOfMeasurements should be(1) + snapshot.segments.size should be(1) + snapshot.segments(SegmentMetricIdentity("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100) } "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture { @@ -179,6 +209,12 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma statementsMetricsOption should not be empty statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot] } + + def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = { + val recorder = Kamon(Metrics)(system).register(TraceMetrics(traceName), TraceMetrics.Factory) + val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext + recorder.get.collect(collectionContext) + } } class NoOpSlowQueryProcessor extends SlowQueryProcessor { @@ -189,3 +225,6 @@ class NoOpSqlErrorProcessor extends SqlErrorProcessor { override def process(sql: String, ex: Throwable): Unit = { /*do nothing!!!*/ } } +class NoOpJdbcNameGenerator extends JdbcNameGenerator { + override def generateJdbcSegmentName(statement: String): String = s"Jdbc[$statement]" +} \ No newline at end of file -- cgit v1.2.3