aboutsummaryrefslogtreecommitdiff
path: root/kamon-jdbc
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-12-04 02:05:33 -0300
committerDiego <diegolparra@gmail.com>2014-12-04 02:27:39 -0300
commit5413b1144610c932b728af79f9224df48c1c5447 (patch)
treead41279ec3b01ee844ba5e0809b4cf90a1699700 /kamon-jdbc
parent8d07527b1bd69c2165b6d4f1f97d852da3e6f0b0 (diff)
downloadKamon-5413b1144610c932b728af79f9224df48c1c5447.tar.gz
Kamon-5413b1144610c932b728af79f9224df48c1c5447.tar.bz2
Kamon-5413b1144610c932b728af79f9224df48c1c5447.zip
+ kamon-jdbc: introduce Jdbc segments
Diffstat (limited to 'kamon-jdbc')
-rw-r--r--kamon-jdbc/src/main/resources/reference.conf3
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala14
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala18
-rw-r--r--kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala45
4 files changed, 74 insertions, 6 deletions
diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf
index bbe98127..7c905f0b 100644
--- a/kamon-jdbc/src/main/resources/reference.conf
+++ b/kamon-jdbc/src/main/resources/reference.conf
@@ -11,6 +11,9 @@ kamon {
# Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor.
sql-error-processor = kamon.jdbc.DefaultSqlErrorProcessor
+
+ # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator that will be used for assigning names to segments.
+ name-generator = kamon.jdbc.DefaultJdbcNameGenerator
}
metrics {
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
index 9de65821..036789b7 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
@@ -23,11 +23,16 @@ import kamon.Kamon
object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = Jdbc
override def createExtension(system: ExtendedActorSystem): JdbcExtension = new JdbcExtension(system)
+
+ val SegmentLibraryName = "jdbc"
}
class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension {
private val config = system.settings.config.getConfig("kamon.jdbc")
+ private val nameGeneratorFQN = config.getString("name-generator")
+ private val nameGenerator: JdbcNameGenerator = system.dynamicAccess.createInstanceFor[JdbcNameGenerator](nameGeneratorFQN, Nil).get
+
private val slowQueryProcessorClass = config.getString("slow-query-processor")
private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get
@@ -38,6 +43,7 @@ class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold)
def processSqlError(sql: String, ex: Throwable) = sqlErrorProcessor.process(sql, ex)
+ def generateJdbcSegmentName(statement: String): String = nameGenerator.generateJdbcSegmentName(statement)
}
trait SlowQueryProcessor {
@@ -48,6 +54,14 @@ trait SqlErrorProcessor {
def process(sql: String, ex: Throwable): Unit
}
+trait JdbcNameGenerator {
+ def generateJdbcSegmentName(statement: String): String
+}
+
+class DefaultJdbcNameGenerator extends JdbcNameGenerator {
+ def generateJdbcSegmentName(statement: String): String = s"Jdbc[$statement]"
+}
+
class DefaultSqlErrorProcessor extends SqlErrorProcessor {
import org.slf4j.LoggerFactory
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
index 41eef716..b2f35c6c 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -23,7 +23,7 @@ import kamon.jdbc.Jdbc
import kamon.jdbc.metric.StatementsMetrics
import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder
import kamon.metric.Metrics
-import kamon.trace.TraceRecorder
+import kamon.trace.{ TraceContext, SegmentCategory, TraceRecorder }
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
import org.slf4j.LoggerFactory
@@ -51,8 +51,10 @@ class StatementInstrumentation {
implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory)
sql.replaceAll(CommentPattern, Empty) match {
- case SelectStatement(_) ⇒ recordRead(pjp, sql, system)
- case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp, sql, system)
+ case SelectStatement(_) ⇒ withSegment(ctx, system, Select)(recordRead(pjp, sql, system))
+ case InsertStatement(_) ⇒ withSegment(ctx, system, Insert)(recordWrite(pjp, sql, system))
+ case UpdateStatement(_) ⇒ withSegment(ctx, system, Update)(recordWrite(pjp, sql, system))
+ case DeleteStatement(_) ⇒ withSegment(ctx, system, Delete)(recordWrite(pjp, sql, system))
case anythingElse ⇒
log.debug(s"Unable to parse sql [$sql]")
pjp.proceed()
@@ -65,6 +67,12 @@ class StatementInstrumentation {
try thunk finally timeSpent(System.nanoTime() - start)
}
+ def withSegment[A](ctx: TraceContext, system: ActorSystem, statement: String)(thunk: ⇒ A): A = {
+ val segmentName = Jdbc(system).generateJdbcSegmentName(statement)
+ val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName)
+ try thunk finally segment.finish()
+ }
+
def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = {
withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒
statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
@@ -95,6 +103,10 @@ object StatementInstrumentation {
val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * /
val Empty = ""
val Statements = "jdbc-statements"
+ val Select = "Select"
+ val Insert = "Insert"
+ val Update = "Update"
+ val Delete = "Delete"
implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) {
def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = {
diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
index 9d3c7124..534edd57 100644
--- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
+++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
@@ -21,12 +21,13 @@ import akka.actor.ActorSystem
import akka.testkit.{ TestKitBase, TestProbe }
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.jdbc.{ SqlErrorProcessor, SlowQueryProcessor }
+import kamon.jdbc.{ Jdbc, JdbcNameGenerator, SqlErrorProcessor, SlowQueryProcessor }
import kamon.jdbc.metric.StatementsMetrics
import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot
-import kamon.metric.Metrics
+import kamon.metric.{ TraceMetrics, Metrics }
import kamon.metric.Subscriptions.TickMetricSnapshot
-import kamon.trace.TraceRecorder
+import kamon.metric.TraceMetrics.TraceMetricsSnapshot
+import kamon.trace.{ SegmentCategory, SegmentMetricIdentity, TraceRecorder }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.duration._
@@ -44,6 +45,9 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
|
| # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor.
| sql-error-processor = kamon.jdbc.instrumentation.NoOpSqlErrorProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.jdbc.JdbcNameGenerator
+ | name-generator = kamon.jdbc.instrumentation.NoOpJdbcNameGenerator
| }
|}
""".stripMargin))
@@ -76,7 +80,14 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
StatementMetrics.writes.numberOfMeasurements should be(100)
+
+ TraceRecorder.finish()
}
+
+ val snapshot = takeSnapshotOf("jdbc-trace-insert")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(SegmentMetricIdentity("Jdbc[Insert]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
}
"record the execution time of SELECT operation" in new StatementsMetricsListenerFixture {
@@ -92,7 +103,14 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
StatementMetrics.reads.numberOfMeasurements should be(100)
+
+ TraceRecorder.finish()
}
+
+ val snapshot = takeSnapshotOf("jdbc-trace-select")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(SegmentMetricIdentity("Jdbc[Select]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
}
"record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture {
@@ -108,7 +126,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
StatementMetrics.writes.numberOfMeasurements should be(100)
+ TraceRecorder.finish()
}
+
+ val snapshot = takeSnapshotOf("jdbc-trace-update")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(SegmentMetricIdentity("Jdbc[Update]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
}
"record the execution time of DELETE operation" in new StatementsMetricsListenerFixture {
@@ -124,7 +148,13 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
StatementMetrics.writes.numberOfMeasurements should be(100)
+ TraceRecorder.finish()
}
+
+ val snapshot = takeSnapshotOf("jdbc-trace-delete")
+ snapshot.elapsedTime.numberOfMeasurements should be(1)
+ snapshot.segments.size should be(1)
+ snapshot.segments(SegmentMetricIdentity("Jdbc[Delete]", SegmentCategory.Database, Jdbc.SegmentLibraryName)).numberOfMeasurements should be(100)
}
"record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture {
@@ -179,6 +209,12 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma
statementsMetricsOption should not be empty
statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot]
}
+
+ def takeSnapshotOf(traceName: String): TraceMetricsSnapshot = {
+ val recorder = Kamon(Metrics)(system).register(TraceMetrics(traceName), TraceMetrics.Factory)
+ val collectionContext = Kamon(Metrics)(system).buildDefaultCollectionContext
+ recorder.get.collect(collectionContext)
+ }
}
class NoOpSlowQueryProcessor extends SlowQueryProcessor {
@@ -189,3 +225,6 @@ class NoOpSqlErrorProcessor extends SqlErrorProcessor {
override def process(sql: String, ex: Throwable): Unit = { /*do nothing!!!*/ }
}
+class NoOpJdbcNameGenerator extends JdbcNameGenerator {
+ override def generateJdbcSegmentName(statement: String): String = s"Jdbc[$statement]"
+} \ No newline at end of file