diff options
author | Diego <diegolparra@gmail.com> | 2014-11-24 16:49:37 -0300 |
---|---|---|
committer | Diego <diegolparra@gmail.com> | 2014-12-04 02:27:38 -0300 |
commit | 4cf69bc1367bbe75623cbab40e653ea08b2e0341 (patch) | |
tree | 07ec52029aee03c9bc89378353229f04a296e70b /kamon-jdbc/src | |
parent | ab9fd324a5df3e411e952576b78dc627595e00bf (diff) | |
download | Kamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.tar.gz Kamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.tar.bz2 Kamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.zip |
+ kamon-jdbc: * include StatementInstrumentationSpec
Diffstat (limited to 'kamon-jdbc/src')
5 files changed, 192 insertions, 57 deletions
diff --git a/kamon-jdbc/src/main/resources/META-INF/aop.xml b/kamon-jdbc/src/main/resources/META-INF/aop.xml index d9ac097f..dd1bbbbe 100644 --- a/kamon-jdbc/src/main/resources/META-INF/aop.xml +++ b/kamon-jdbc/src/main/resources/META-INF/aop.xml @@ -5,7 +5,7 @@ <aspect name="kamon.jdbc.instrumentation.StatementInstrumentation"/> </aspects> - <weaver options="-verbose"> + <weaver> <include within="java.sql.Statement+..*"/> <include within="java.sql.Connection+..*"/> </weaver> diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala index 7442fb96..3199d27b 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala @@ -1,3 +1,18 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.jdbc import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } @@ -29,7 +44,7 @@ class DefaultSlowQueryProcessor extends SlowQueryProcessor { val log = LoggerFactory.getLogger("slow-query-processor") - override def process(sql: String, executionTime: Long, queryThreshold: Long): Unit = { - log.warn(s"The query: $sql took ${executionTime}ms and the slow query threshold is ${queryThreshold}ms") + override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { + log.warn(s"The query [$sql] took $executionTimeInMillis ms and the slow query threshold is $queryThresholdInMillis ms") } } diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala index faa068aa..f0a83007 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -16,7 +16,7 @@ package kamon.jdbc.instrumentation import java.sql.SQLException - +import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } import akka.actor.ActorSystem import kamon.Kamon import kamon.jdbc.Jdbc @@ -48,11 +48,11 @@ class StatementInstrumentation { def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - if (statementRecorder.nonEmpty) { - statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics("Statements"), StatementsMetrics.Factory) + if (statementRecorder.isEmpty) { + statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) } - sql.replaceAll(CommentPattern, "") match { + sql.replaceAll(CommentPattern, Empty) match { case SelectStatement(_) ⇒ recordRead(pjp, sql)(system) case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp) case anythingElse ⇒ @@ -63,7 +63,7 @@ class StatementInstrumentation { } @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex") - def onError(sql: String, ex:SQLException): Unit = { + def onError(sql: String, ex: SQLException): Unit = { log.error(s"the query [$sql] failed with exception [${ex.getMessage}]") statementRecorder.map(stmr ⇒ stmr.errors.increment()) } @@ -74,20 +74,21 @@ class StatementInstrumentation { } def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = { - withTimeSpent(pjp.proceed()) { - timeSpent ⇒ - statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) - - if (timeSpent >= Jdbc(system).slowQueryThreshold) { - statementRecorder.map(stmr ⇒ stmr.slow.increment()) - Jdbc(system).processSlowQuery(sql, timeSpent) - } + withTimeSpent(pjp.proceed()) { timeSpent ⇒ + statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) + + val timeSpentInMillis = nanos.toMillis(timeSpent) + + if (timeSpentInMillis >= Jdbc(system).slowQueryThreshold) { + statementRecorder.map(stmr ⇒ stmr.slow.increment()) + Jdbc(system).processSlowQuery(sql, timeSpentInMillis) + } } } def recordWrite(pjp: ProceedingJoinPoint): Any = { - withTimeSpent(pjp.proceed()) { - timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) + withTimeSpent(pjp.proceed()) { timeSpent ⇒ + statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) } } } @@ -100,5 +101,7 @@ object StatementInstrumentation { val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / + val Empty = "" + val Statements = "jdbc-statements" } diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala index bc52c249..7ba8b105 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala @@ -43,19 +43,19 @@ object StatementsMetrics extends MetricGroupCategory { def cleanup: Unit = {} } - case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slow: Counter.Snapshot, errors: Counter.Snapshot) + case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slows: Counter.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { type GroupSnapshotType = StatementsMetricsSnapshot def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = { - StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slow.merge(that.slow, context), errors.merge(that.errors, context)) + StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slows.merge(that.slows, context), errors.merge(that.errors, context)) } lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( Writes -> writes, Reads -> reads, - Slows -> slow, + Slows -> slows, Reads -> errors) } diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala index ec8ca4a1..ee68234d 100644 --- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala +++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala @@ -15,53 +15,170 @@ package kamon.jdbc.instrumentation -import java.sql.{SQLException, DriverManager} +import java.sql.{DriverManager, SQLException} import akka.actor.ActorSystem -import akka.testkit.TestKit +import akka.testkit.{TestKitBase, TestProbe} +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.jdbc.SlowQueryProcessor +import kamon.jdbc.metric.StatementsMetrics +import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot +import kamon.metric.Metrics +import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.trace.TraceRecorder -import org.scalatest.{Matchers, WordSpecLike} +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import scala.concurrent.duration._ -class StatementInstrumentationSpec extends TestKit(ActorSystem("jdbc-spec")) with WordSpecLike with Matchers { +class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { - val connection = DriverManager.getConnection("jdbc:h2:mem:test","SA", "") + implicit lazy val system: ActorSystem = ActorSystem("jdbc-spec", ConfigFactory.parseString( + """ + |kamon { + | jdbc { + | slow-query-threshold = 100 milliseconds + | + | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. + | slow-query-processor = kamon.jdbc.instrumentation.NOPSlowQueryProcessor + | } + |} + """.stripMargin)) + + val connection = DriverManager.getConnection("jdbc:h2:mem:jdbc-spec","SA", "") + + override protected def beforeAll(): Unit = { + connection should not be null + + val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));" + val createStatement = connection.createStatement() + createStatement.executeUpdate(create) + + val sleep = "CREATE ALIAS SLEEP FOR \"java.lang.Thread.sleep(long)\"" + val sleepStatement = connection.createStatement() + sleepStatement.executeUpdate(sleep) + } "the StatementInstrumentation" should { - "bblabals" in { - TraceRecorder.withNewTraceContext("jdbc-trace") { - connection should not be null - - val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));" - val createStatement = connection.createStatement() - createStatement.executeUpdate(create) - - val insert = "INSERT INTO Address (Nr, Name) VALUES(1, 'foo')" - val insertStatement = connection.prepareStatement(insert) - insertStatement.execute() - - val select = - """ - |/*this is a comment*/ - |SELECT * FROM Address""".stripMargin - val selectStatement = connection.prepareCall(select) - selectStatement.execute() - - val update = "UPDATE Address SET Name = 'bar' where Nr = 1" - val updateStatement = connection.createStatement() - updateStatement.execute(update) - - val delete = "DELETE FROM Address where Nr = 1" - val deleteStatement = connection.createStatement() - deleteStatement.execute(delete) - - intercept[SQLException] { - val error = "SELECT * FROM NON_EXIST_TABLE" - val errorStatement = connection.createStatement() - errorStatement.execute(error) + "record the execution time of the INSERT operation" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-insert") { + + val metricsListener = subscribeToMetrics() + + for(id <- 1 to 100) { + val insert = s"INSERT INTO Address (Nr, Name) VALUES($id, 'foo')" + val insertStatement = connection.prepareStatement(insert) + insertStatement.execute() + } + + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.writes.numberOfMeasurements should be(100) + } + } + + "record the execution time of SELECT operation" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-select") { + + val metricsListener = subscribeToMetrics() + + for(id <- 1 to 100) { + val select = s"SELECT * FROM Address where Nr = $id" + val selectStatement = connection.createStatement() + selectStatement.execute(select) + } + + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.reads.numberOfMeasurements should be(100) + } + } + + "record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-update") { + + val metricsListener = subscribeToMetrics() + + for(id <- 1 to 100) { + val update = s"UPDATE Address SET Name = 'bar$id' where Nr = $id" + val updateStatement = connection.prepareStatement(update) + updateStatement.execute() + } + + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.writes.numberOfMeasurements should be(100) + } + } + + "record the execution time of DELETE operation" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-insert") { + + val metricsListener = subscribeToMetrics() + + for(id <- 1 to 100) { + val delete = s"DELETE FROM Address where Nr = $id" + val deleteStatement = connection.createStatement() + deleteStatement.execute(delete) } + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.writes.numberOfMeasurements should be(100) } } + + "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-slow") { + + val metricsListener = subscribeToMetrics() + + for(id <- 1 to 2) { + val select = s"SELECT * FROM Address; CALL SLEEP(100)" + val selectStatement = connection.createStatement() + selectStatement.execute(select) + } + + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.slows.count should be(2) + } + } + + "count the total ERRORS" in new StatementsMetricsListenerFixture { + TraceRecorder.withNewTraceContext("jdbc-trace-slow") { + + val metricsListener = subscribeToMetrics() + + for(_ <- 1 to 10) { + intercept[SQLException] { + val error = "SELECT * FROM NON_EXIST_TABLE" + val errorStatement = connection.createStatement() + errorStatement.execute(error) + } + } + val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds) + StatementMetrics.errors.count should be(10) + } + } + } + + trait StatementsMetricsListenerFixture { + def subscribeToMetrics( ): TestProbe = { + val metricsListener = TestProbe() + Kamon(Metrics).subscribe(StatementsMetrics, "*", metricsListener.ref, permanently = true) + // Wait for one empty snapshot before proceeding to the test. + metricsListener.expectMsgType[TickMetricSnapshot] + metricsListener + } + } + + def expectStatementsMetrics(listener: TestProbe, waitTime: FiniteDuration): StatementsMetricsSnapshot = { + val tickSnapshot = within(waitTime) { + listener.expectMsgType[TickMetricSnapshot] + } + val statementsMetricsOption = tickSnapshot.metrics.get(StatementsMetrics(StatementInstrumentation.Statements)) + statementsMetricsOption should not be empty + statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot] } } +class NOPSlowQueryProcessor extends SlowQueryProcessor { + override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = {/*do nothing!!!*/} +} + + |