aboutsummaryrefslogtreecommitdiff
path: root/kamon-jdbc
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-11-23 14:55:46 -0300
committerDiego <diegolparra@gmail.com>2014-12-04 02:26:25 -0300
commitab9fd324a5df3e411e952576b78dc627595e00bf (patch)
tree85cee1b683130ab0a319d072005701327723e746 /kamon-jdbc
parentbbe12e7c5c1897b6f10486cb216391b33224d5f7 (diff)
downloadKamon-ab9fd324a5df3e411e952576b78dc627595e00bf.tar.gz
Kamon-ab9fd324a5df3e411e952576b78dc627595e00bf.tar.bz2
Kamon-ab9fd324a5df3e411e952576b78dc627595e00bf.zip
! kamon-jdbc: instroduce Statements Metrics like writes, reads, errors and slows queries counter
Diffstat (limited to 'kamon-jdbc')
-rw-r--r--kamon-jdbc/src/main/resources/META-INF/aop.xml12
-rw-r--r--kamon-jdbc/src/main/resources/reference.conf23
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala35
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala104
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala83
-rw-r--r--kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala67
6 files changed, 324 insertions, 0 deletions
diff --git a/kamon-jdbc/src/main/resources/META-INF/aop.xml b/kamon-jdbc/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..d9ac097f
--- /dev/null
+++ b/kamon-jdbc/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,12 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <aspects>
+ <aspect name="kamon.jdbc.instrumentation.StatementInstrumentation"/>
+ </aspects>
+
+ <weaver options="-verbose">
+ <include within="java.sql.Statement+..*"/>
+ <include within="java.sql.Connection+..*"/>
+ </weaver>
+</aspectj>
diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf
new file mode 100644
index 00000000..ce114861
--- /dev/null
+++ b/kamon-jdbc/src/main/resources/reference.conf
@@ -0,0 +1,23 @@
+# ================================== #
+# Kamon-jdbc Reference Configuration #
+# ================================== #
+
+kamon {
+ jdbc {
+ slow-query-threshold = 2 seconds
+
+ # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor.
+ slow-query-processor = kamon.jdbc.DefaultSlowQueryProcessor
+ }
+
+ metrics {
+ precision {
+ jdbc {
+ statements {
+ reads = ${kamon.metrics.precision.default-histogram-precision}
+ writes = ${kamon.metrics.precision.default-histogram-precision}
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
new file mode 100644
index 00000000..7442fb96
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
@@ -0,0 +1,35 @@
+package kamon.jdbc
+
+import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
+
+import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
+import kamon.Kamon
+
+object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = Jdbc
+ override def createExtension(system: ExtendedActorSystem): JdbcExtension = new JdbcExtension(system)
+}
+
+class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ private val config = system.settings.config.getConfig("kamon.jdbc")
+ private val slowQueryProcessorClass = config.getString("slow-query-processor")
+ private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get
+
+ val slowQueryThreshold = config.getDuration("slow-query-threshold", milliseconds)
+
+ def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold)
+}
+
+trait SlowQueryProcessor {
+ def process(sql: String, executionTime: Long, queryThreshold: Long): Unit
+}
+
+class DefaultSlowQueryProcessor extends SlowQueryProcessor {
+ import org.slf4j.LoggerFactory
+
+ val log = LoggerFactory.getLogger("slow-query-processor")
+
+ override def process(sql: String, executionTime: Long, queryThreshold: Long): Unit = {
+ log.warn(s"The query: $sql took ${executionTime}ms and the slow query threshold is ${queryThreshold}ms")
+ }
+}
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
new file mode 100644
index 00000000..faa068aa
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -0,0 +1,104 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.instrumentation
+
+import java.sql.SQLException
+
+import akka.actor.ActorSystem
+import kamon.Kamon
+import kamon.jdbc.Jdbc
+import kamon.jdbc.metric.StatementsMetrics
+import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsRecorder
+import kamon.metric.Metrics
+import kamon.trace.TraceRecorder
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.{ AfterThrowing, Around, Aspect, Pointcut }
+import org.slf4j.LoggerFactory
+
+@Aspect
+class StatementInstrumentation {
+
+ import StatementInstrumentation._
+
+ @volatile var statementRecorder: Option[StatementsMetricsRecorder] = Option.empty
+
+ @Pointcut("execution(* java.sql.Statement.execute*(..)) && args(sql)")
+ def onExecuteStatement(sql: String): Unit = {}
+
+ @Pointcut("execution(* java.sql.Connection.prepareStatement(..)) && args(sql)")
+ def onExecutePreparedStatement(sql: String): Unit = {}
+
+ @Pointcut("execution(* java.sql.Connection.prepareCall(..)) && args(sql)")
+ def onExecutePreparedCall(sql: String): Unit = {}
+
+ @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)")
+ def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = {
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+
+ if (statementRecorder.nonEmpty) {
+ statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics("Statements"), StatementsMetrics.Factory)
+ }
+
+ sql.replaceAll(CommentPattern, "") match {
+ case SelectStatement(_) ⇒ recordRead(pjp, sql)(system)
+ case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp)
+ case anythingElse ⇒
+ log.debug(s"Unable to parse sql [$sql]")
+ pjp.proceed()
+ }
+ } getOrElse pjp.proceed()
+ }
+
+ @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex")
+ def onError(sql: String, ex:SQLException): Unit = {
+ log.error(s"the query [$sql] failed with exception [${ex.getMessage}]")
+ statementRecorder.map(stmr ⇒ stmr.errors.increment())
+ }
+
+ def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = {
+ val start = System.nanoTime()
+ try thunk finally timeSpent(System.nanoTime() - start)
+ }
+
+ def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = {
+ withTimeSpent(pjp.proceed()) {
+ timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
+
+ if (timeSpent >= Jdbc(system).slowQueryThreshold) {
+ statementRecorder.map(stmr ⇒ stmr.slow.increment())
+ Jdbc(system).processSlowQuery(sql, timeSpent)
+ }
+ }
+ }
+
+ def recordWrite(pjp: ProceedingJoinPoint): Any = {
+ withTimeSpent(pjp.proceed()) {
+ timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
+ }
+ }
+}
+
+object StatementInstrumentation {
+ val log = LoggerFactory.getLogger("StatementInstrumentation")
+
+ val SelectStatement = "(?i)^\\s*select.*?\\sfrom[\\s\\[]+([^\\]\\s,)(;]*).*".r
+ val InsertStatement = "(?i)^\\s*insert(?:\\s+ignore)?\\s+into\\s+([^\\s(,;]*).*".r
+ val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r
+ val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r
+ val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * /
+}
+
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
new file mode 100644
index 00000000..bc52c249
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
@@ -0,0 +1,83 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.{ CollectionContext, MetricGroupCategory, MetricGroupFactory, MetricGroupIdentity, MetricGroupRecorder, MetricGroupSnapshot, MetricIdentity, MetricSnapshot }
+import kamon.metric.instrument.{ Counter, Histogram }
+
+case class StatementsMetrics(name: String) extends MetricGroupIdentity {
+ val category = StatementsMetrics
+}
+
+object StatementsMetrics extends MetricGroupCategory {
+ val name = "jdbc-statements"
+
+ case object Writes extends MetricIdentity { val name = "writes" }
+ case object Reads extends MetricIdentity { val name = "reads" }
+ case object Slows extends MetricIdentity { val name = "slow-queries" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class StatementsMetricsRecorder(writes: Histogram, reads: Histogram, slow: Counter, errors: Counter)
+ extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): MetricGroupSnapshot = {
+ StatementsMetricsSnapshot(writes.collect(context), reads.collect(context), slow.collect(context), errors.collect(context))
+ }
+
+ def cleanup: Unit = {}
+ }
+
+ case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slow: Counter.Snapshot, errors: Counter.Snapshot)
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = StatementsMetricsSnapshot
+
+ def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = {
+ StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slow.merge(that.slow, context), errors.merge(that.errors, context))
+ }
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ Writes -> writes,
+ Reads -> reads,
+ Slows -> slow,
+ Reads -> errors)
+ }
+
+ val Factory = StatementsMetricsGroupFactory
+}
+
+case object StatementsMetricsGroupFactory extends MetricGroupFactory {
+ import kamon.jdbc.metric.StatementsMetrics._
+
+ type GroupRecorder = StatementsMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): GroupRecorder = {
+ val settings = config.getConfig("precision.jdbc.statements")
+
+ val writesConfig = settings.getConfig("writes")
+ val readsConfig = settings.getConfig("reads")
+
+ new StatementsMetricsRecorder(
+ Histogram.fromConfig(writesConfig),
+ Histogram.fromConfig(readsConfig),
+ Counter(),
+ Counter())
+ }
+}
+
diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
new file mode 100644
index 00000000..ec8ca4a1
--- /dev/null
+++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
@@ -0,0 +1,67 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.instrumentation
+
+import java.sql.{SQLException, DriverManager}
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import kamon.trace.TraceRecorder
+import org.scalatest.{Matchers, WordSpecLike}
+
+class StatementInstrumentationSpec extends TestKit(ActorSystem("jdbc-spec")) with WordSpecLike with Matchers {
+
+ val connection = DriverManager.getConnection("jdbc:h2:mem:test","SA", "")
+
+ "the StatementInstrumentation" should {
+ "bblabals" in {
+ TraceRecorder.withNewTraceContext("jdbc-trace") {
+ connection should not be null
+
+ val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));"
+ val createStatement = connection.createStatement()
+ createStatement.executeUpdate(create)
+
+ val insert = "INSERT INTO Address (Nr, Name) VALUES(1, 'foo')"
+ val insertStatement = connection.prepareStatement(insert)
+ insertStatement.execute()
+
+ val select =
+ """
+ |/*this is a comment*/
+ |SELECT * FROM Address""".stripMargin
+ val selectStatement = connection.prepareCall(select)
+ selectStatement.execute()
+
+ val update = "UPDATE Address SET Name = 'bar' where Nr = 1"
+ val updateStatement = connection.createStatement()
+ updateStatement.execute(update)
+
+ val delete = "DELETE FROM Address where Nr = 1"
+ val deleteStatement = connection.createStatement()
+ deleteStatement.execute(delete)
+
+ intercept[SQLException] {
+ val error = "SELECT * FROM NON_EXIST_TABLE"
+ val errorStatement = connection.createStatement()
+ errorStatement.execute(error)
+ }
+
+ }
+ }
+ }
+}
+