aboutsummaryrefslogtreecommitdiff
path: root/kamon-jdbc
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-11-24 16:49:37 -0300
committerDiego <diegolparra@gmail.com>2014-12-04 02:27:38 -0300
commit4cf69bc1367bbe75623cbab40e653ea08b2e0341 (patch)
tree07ec52029aee03c9bc89378353229f04a296e70b /kamon-jdbc
parentab9fd324a5df3e411e952576b78dc627595e00bf (diff)
downloadKamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.tar.gz
Kamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.tar.bz2
Kamon-4cf69bc1367bbe75623cbab40e653ea08b2e0341.zip
+ kamon-jdbc: * include StatementInstrumentationSpec
Diffstat (limited to 'kamon-jdbc')
-rw-r--r--kamon-jdbc/src/main/resources/META-INF/aop.xml2
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala19
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala33
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala6
-rw-r--r--kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala189
5 files changed, 192 insertions, 57 deletions
diff --git a/kamon-jdbc/src/main/resources/META-INF/aop.xml b/kamon-jdbc/src/main/resources/META-INF/aop.xml
index d9ac097f..dd1bbbbe 100644
--- a/kamon-jdbc/src/main/resources/META-INF/aop.xml
+++ b/kamon-jdbc/src/main/resources/META-INF/aop.xml
@@ -5,7 +5,7 @@
<aspect name="kamon.jdbc.instrumentation.StatementInstrumentation"/>
</aspects>
- <weaver options="-verbose">
+ <weaver>
<include within="java.sql.Statement+..*"/>
<include within="java.sql.Connection+..*"/>
</weaver>
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
index 7442fb96..3199d27b 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
@@ -1,3 +1,18 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.jdbc
import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
@@ -29,7 +44,7 @@ class DefaultSlowQueryProcessor extends SlowQueryProcessor {
val log = LoggerFactory.getLogger("slow-query-processor")
- override def process(sql: String, executionTime: Long, queryThreshold: Long): Unit = {
- log.warn(s"The query: $sql took ${executionTime}ms and the slow query threshold is ${queryThreshold}ms")
+ override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = {
+ log.warn(s"The query [$sql] took $executionTimeInMillis ms and the slow query threshold is $queryThresholdInMillis ms")
}
}
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
index faa068aa..f0a83007 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -16,7 +16,7 @@
package kamon.jdbc.instrumentation
import java.sql.SQLException
-
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos }
import akka.actor.ActorSystem
import kamon.Kamon
import kamon.jdbc.Jdbc
@@ -48,11 +48,11 @@ class StatementInstrumentation {
def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = {
TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
- if (statementRecorder.nonEmpty) {
- statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics("Statements"), StatementsMetrics.Factory)
+ if (statementRecorder.isEmpty) {
+ statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory)
}
- sql.replaceAll(CommentPattern, "") match {
+ sql.replaceAll(CommentPattern, Empty) match {
case SelectStatement(_) ⇒ recordRead(pjp, sql)(system)
case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp)
case anythingElse ⇒
@@ -63,7 +63,7 @@ class StatementInstrumentation {
}
@AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex")
- def onError(sql: String, ex:SQLException): Unit = {
+ def onError(sql: String, ex: SQLException): Unit = {
log.error(s"the query [$sql] failed with exception [${ex.getMessage}]")
statementRecorder.map(stmr ⇒ stmr.errors.increment())
}
@@ -74,20 +74,21 @@ class StatementInstrumentation {
}
def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = {
- withTimeSpent(pjp.proceed()) {
- timeSpent ⇒
- statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
-
- if (timeSpent >= Jdbc(system).slowQueryThreshold) {
- statementRecorder.map(stmr ⇒ stmr.slow.increment())
- Jdbc(system).processSlowQuery(sql, timeSpent)
- }
+ withTimeSpent(pjp.proceed()) { timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
+
+ val timeSpentInMillis = nanos.toMillis(timeSpent)
+
+ if (timeSpentInMillis >= Jdbc(system).slowQueryThreshold) {
+ statementRecorder.map(stmr ⇒ stmr.slow.increment())
+ Jdbc(system).processSlowQuery(sql, timeSpentInMillis)
+ }
}
}
def recordWrite(pjp: ProceedingJoinPoint): Any = {
- withTimeSpent(pjp.proceed()) {
- timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
+ withTimeSpent(pjp.proceed()) { timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
}
}
}
@@ -100,5 +101,7 @@ object StatementInstrumentation {
val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r
val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r
val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * /
+ val Empty = ""
+ val Statements = "jdbc-statements"
}
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
index bc52c249..7ba8b105 100644
--- a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
@@ -43,19 +43,19 @@ object StatementsMetrics extends MetricGroupCategory {
def cleanup: Unit = {}
}
- case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slow: Counter.Snapshot, errors: Counter.Snapshot)
+ case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slows: Counter.Snapshot, errors: Counter.Snapshot)
extends MetricGroupSnapshot {
type GroupSnapshotType = StatementsMetricsSnapshot
def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = {
- StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slow.merge(that.slow, context), errors.merge(that.errors, context))
+ StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slows.merge(that.slows, context), errors.merge(that.errors, context))
}
lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
Writes -> writes,
Reads -> reads,
- Slows -> slow,
+ Slows -> slows,
Reads -> errors)
}
diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
index ec8ca4a1..ee68234d 100644
--- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
+++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
@@ -15,53 +15,170 @@
package kamon.jdbc.instrumentation
-import java.sql.{SQLException, DriverManager}
+import java.sql.{DriverManager, SQLException}
import akka.actor.ActorSystem
-import akka.testkit.TestKit
+import akka.testkit.{TestKitBase, TestProbe}
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.jdbc.SlowQueryProcessor
+import kamon.jdbc.metric.StatementsMetrics
+import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot
+import kamon.metric.Metrics
+import kamon.metric.Subscriptions.TickMetricSnapshot
import kamon.trace.TraceRecorder
-import org.scalatest.{Matchers, WordSpecLike}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import scala.concurrent.duration._
-class StatementInstrumentationSpec extends TestKit(ActorSystem("jdbc-spec")) with WordSpecLike with Matchers {
+class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll {
- val connection = DriverManager.getConnection("jdbc:h2:mem:test","SA", "")
+ implicit lazy val system: ActorSystem = ActorSystem("jdbc-spec", ConfigFactory.parseString(
+ """
+ |kamon {
+ | jdbc {
+ | slow-query-threshold = 100 milliseconds
+ |
+ | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor.
+ | slow-query-processor = kamon.jdbc.instrumentation.NOPSlowQueryProcessor
+ | }
+ |}
+ """.stripMargin))
+
+ val connection = DriverManager.getConnection("jdbc:h2:mem:jdbc-spec","SA", "")
+
+ override protected def beforeAll(): Unit = {
+ connection should not be null
+
+ val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));"
+ val createStatement = connection.createStatement()
+ createStatement.executeUpdate(create)
+
+ val sleep = "CREATE ALIAS SLEEP FOR \"java.lang.Thread.sleep(long)\""
+ val sleepStatement = connection.createStatement()
+ sleepStatement.executeUpdate(sleep)
+ }
"the StatementInstrumentation" should {
- "bblabals" in {
- TraceRecorder.withNewTraceContext("jdbc-trace") {
- connection should not be null
-
- val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));"
- val createStatement = connection.createStatement()
- createStatement.executeUpdate(create)
-
- val insert = "INSERT INTO Address (Nr, Name) VALUES(1, 'foo')"
- val insertStatement = connection.prepareStatement(insert)
- insertStatement.execute()
-
- val select =
- """
- |/*this is a comment*/
- |SELECT * FROM Address""".stripMargin
- val selectStatement = connection.prepareCall(select)
- selectStatement.execute()
-
- val update = "UPDATE Address SET Name = 'bar' where Nr = 1"
- val updateStatement = connection.createStatement()
- updateStatement.execute(update)
-
- val delete = "DELETE FROM Address where Nr = 1"
- val deleteStatement = connection.createStatement()
- deleteStatement.execute(delete)
-
- intercept[SQLException] {
- val error = "SELECT * FROM NON_EXIST_TABLE"
- val errorStatement = connection.createStatement()
- errorStatement.execute(error)
+ "record the execution time of the INSERT operation" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-insert") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(id <- 1 to 100) {
+ val insert = s"INSERT INTO Address (Nr, Name) VALUES($id, 'foo')"
+ val insertStatement = connection.prepareStatement(insert)
+ insertStatement.execute()
+ }
+
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.writes.numberOfMeasurements should be(100)
+ }
+ }
+
+ "record the execution time of SELECT operation" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-select") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(id <- 1 to 100) {
+ val select = s"SELECT * FROM Address where Nr = $id"
+ val selectStatement = connection.createStatement()
+ selectStatement.execute(select)
+ }
+
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.reads.numberOfMeasurements should be(100)
+ }
+ }
+
+ "record the execution time of UPDATE operation" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-update") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(id <- 1 to 100) {
+ val update = s"UPDATE Address SET Name = 'bar$id' where Nr = $id"
+ val updateStatement = connection.prepareStatement(update)
+ updateStatement.execute()
+ }
+
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.writes.numberOfMeasurements should be(100)
+ }
+ }
+
+ "record the execution time of DELETE operation" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-insert") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(id <- 1 to 100) {
+ val delete = s"DELETE FROM Address where Nr = $id"
+ val deleteStatement = connection.createStatement()
+ deleteStatement.execute(delete)
}
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.writes.numberOfMeasurements should be(100)
}
}
+
+ "record the execution time of SLOW QUERIES based on the kamon.jdbc.slow-query-threshold" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-slow") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(id <- 1 to 2) {
+ val select = s"SELECT * FROM Address; CALL SLEEP(100)"
+ val selectStatement = connection.createStatement()
+ selectStatement.execute(select)
+ }
+
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.slows.count should be(2)
+ }
+ }
+
+ "count the total ERRORS" in new StatementsMetricsListenerFixture {
+ TraceRecorder.withNewTraceContext("jdbc-trace-slow") {
+
+ val metricsListener = subscribeToMetrics()
+
+ for(_ <- 1 to 10) {
+ intercept[SQLException] {
+ val error = "SELECT * FROM NON_EXIST_TABLE"
+ val errorStatement = connection.createStatement()
+ errorStatement.execute(error)
+ }
+ }
+ val StatementMetrics = expectStatementsMetrics(metricsListener, 3 seconds)
+ StatementMetrics.errors.count should be(10)
+ }
+ }
+ }
+
+ trait StatementsMetricsListenerFixture {
+ def subscribeToMetrics( ): TestProbe = {
+ val metricsListener = TestProbe()
+ Kamon(Metrics).subscribe(StatementsMetrics, "*", metricsListener.ref, permanently = true)
+ // Wait for one empty snapshot before proceeding to the test.
+ metricsListener.expectMsgType[TickMetricSnapshot]
+ metricsListener
+ }
+ }
+
+ def expectStatementsMetrics(listener: TestProbe, waitTime: FiniteDuration): StatementsMetricsSnapshot = {
+ val tickSnapshot = within(waitTime) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+ val statementsMetricsOption = tickSnapshot.metrics.get(StatementsMetrics(StatementInstrumentation.Statements))
+ statementsMetricsOption should not be empty
+ statementsMetricsOption.get.asInstanceOf[StatementsMetricsSnapshot]
}
}
+class NOPSlowQueryProcessor extends SlowQueryProcessor {
+ override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = {/*do nothing!!!*/}
+}
+
+