diff options
8 files changed, 339 insertions, 1 deletions
diff --git a/kamon-jdbc/src/main/resources/META-INF/aop.xml b/kamon-jdbc/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..d9ac097f --- /dev/null +++ b/kamon-jdbc/src/main/resources/META-INF/aop.xml @@ -0,0 +1,12 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <aspects> + <aspect name="kamon.jdbc.instrumentation.StatementInstrumentation"/> + </aspects> + + <weaver options="-verbose"> + <include within="java.sql.Statement+..*"/> + <include within="java.sql.Connection+..*"/> + </weaver> +</aspectj> diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf new file mode 100644 index 00000000..ce114861 --- /dev/null +++ b/kamon-jdbc/src/main/resources/reference.conf @@ -0,0 +1,23 @@ +# ================================== # +# Kamon-jdbc Reference Configuration # +# ================================== # + +kamon { + jdbc { + slow-query-threshold = 2 seconds + + # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. + slow-query-processor = kamon.jdbc.DefaultSlowQueryProcessor + } + + metrics { + precision { + jdbc { + statements { + reads = ${kamon.metrics.precision.default-histogram-precision} + writes = ${kamon.metrics.precision.default-histogram-precision} + } + } + } + } +}
\ No newline at end of file diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala new file mode 100644 index 00000000..7442fb96 --- /dev/null +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala @@ -0,0 +1,35 @@ +package kamon.jdbc + +import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds } + +import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import kamon.Kamon + +object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = Jdbc + override def createExtension(system: ExtendedActorSystem): JdbcExtension = new JdbcExtension(system) +} + +class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val config = system.settings.config.getConfig("kamon.jdbc") + private val slowQueryProcessorClass = config.getString("slow-query-processor") + private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get + + val slowQueryThreshold = config.getDuration("slow-query-threshold", milliseconds) + + def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold) +} + +trait SlowQueryProcessor { + def process(sql: String, executionTime: Long, queryThreshold: Long): Unit +} + +class DefaultSlowQueryProcessor extends SlowQueryProcessor { + import org.slf4j.LoggerFactory + + val log = LoggerFactory.getLogger("slow-query-processor") + + override def process(sql: String, executionTime: Long, queryThreshold: Long): Unit = { + log.warn(s"The query: $sql took ${executionTime}ms and the slow query threshold is ${queryThreshold}ms") + } +} diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala new file mode 100644 index 00000000..faa068aa --- /dev/null +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -0,0 +1,104 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.jdbc.instrumentation + +import java.sql.SQLException + +import akka.actor.ActorSystem +import kamon.Kamon +import kamon.jdbc.Jdbc +import kamon.jdbc.metric.StatementsMetrics +import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsRecorder +import kamon.metric.Metrics +import kamon.trace.TraceRecorder +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ AfterThrowing, Around, Aspect, Pointcut } +import org.slf4j.LoggerFactory + +@Aspect +class StatementInstrumentation { + + import StatementInstrumentation._ + + @volatile var statementRecorder: Option[StatementsMetricsRecorder] = Option.empty + + @Pointcut("execution(* java.sql.Statement.execute*(..)) && args(sql)") + def onExecuteStatement(sql: String): Unit = {} + + @Pointcut("execution(* java.sql.Connection.prepareStatement(..)) && args(sql)") + def onExecutePreparedStatement(sql: String): Unit = {} + + @Pointcut("execution(* java.sql.Connection.prepareCall(..)) && args(sql)") + def onExecutePreparedCall(sql: String): Unit = {} + + @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)") + def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { + TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ + + if (statementRecorder.nonEmpty) { + statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics("Statements"), StatementsMetrics.Factory) + } + + sql.replaceAll(CommentPattern, "") match { + case SelectStatement(_) ⇒ recordRead(pjp, sql)(system) + case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp) + case anythingElse ⇒ + log.debug(s"Unable to parse sql [$sql]") + pjp.proceed() + } + } getOrElse pjp.proceed() + } + + @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex") + def onError(sql: String, ex:SQLException): Unit = { + log.error(s"the query [$sql] failed with exception [${ex.getMessage}]") + statementRecorder.map(stmr ⇒ stmr.errors.increment()) + } + + def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = { + val start = System.nanoTime() + try thunk finally timeSpent(System.nanoTime() - start) + } + + def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = { + withTimeSpent(pjp.proceed()) { + timeSpent ⇒ + statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) + + if (timeSpent >= Jdbc(system).slowQueryThreshold) { + statementRecorder.map(stmr ⇒ stmr.slow.increment()) + Jdbc(system).processSlowQuery(sql, timeSpent) + } + } + } + + def recordWrite(pjp: ProceedingJoinPoint): Any = { + withTimeSpent(pjp.proceed()) { + timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) + } + } +} + +object StatementInstrumentation { + val log = LoggerFactory.getLogger("StatementInstrumentation") + + val SelectStatement = "(?i)^\\s*select.*?\\sfrom[\\s\\[]+([^\\]\\s,)(;]*).*".r + val InsertStatement = "(?i)^\\s*insert(?:\\s+ignore)?\\s+into\\s+([^\\s(,;]*).*".r + val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r + val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r + val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / +} + diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala new file mode 100644 index 00000000..bc52c249 --- /dev/null +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala @@ -0,0 +1,83 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.jdbc.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.{ CollectionContext, MetricGroupCategory, MetricGroupFactory, MetricGroupIdentity, MetricGroupRecorder, MetricGroupSnapshot, MetricIdentity, MetricSnapshot } +import kamon.metric.instrument.{ Counter, Histogram } + +case class StatementsMetrics(name: String) extends MetricGroupIdentity { + val category = StatementsMetrics +} + +object StatementsMetrics extends MetricGroupCategory { + val name = "jdbc-statements" + + case object Writes extends MetricIdentity { val name = "writes" } + case object Reads extends MetricIdentity { val name = "reads" } + case object Slows extends MetricIdentity { val name = "slow-queries" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class StatementsMetricsRecorder(writes: Histogram, reads: Histogram, slow: Counter, errors: Counter) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = { + StatementsMetricsSnapshot(writes.collect(context), reads.collect(context), slow.collect(context), errors.collect(context)) + } + + def cleanup: Unit = {} + } + + case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slow: Counter.Snapshot, errors: Counter.Snapshot) + extends MetricGroupSnapshot { + + type GroupSnapshotType = StatementsMetricsSnapshot + + def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = { + StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slow.merge(that.slow, context), errors.merge(that.errors, context)) + } + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + Writes -> writes, + Reads -> reads, + Slows -> slow, + Reads -> errors) + } + + val Factory = StatementsMetricsGroupFactory +} + +case object StatementsMetricsGroupFactory extends MetricGroupFactory { + import kamon.jdbc.metric.StatementsMetrics._ + + type GroupRecorder = StatementsMetricsRecorder + + def create(config: Config, system: ActorSystem): GroupRecorder = { + val settings = config.getConfig("precision.jdbc.statements") + + val writesConfig = settings.getConfig("writes") + val readsConfig = settings.getConfig("reads") + + new StatementsMetricsRecorder( + Histogram.fromConfig(writesConfig), + Histogram.fromConfig(readsConfig), + Counter(), + Counter()) + } +} + diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala new file mode 100644 index 00000000..ec8ca4a1 --- /dev/null +++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala @@ -0,0 +1,67 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.jdbc.instrumentation + +import java.sql.{SQLException, DriverManager} + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import kamon.trace.TraceRecorder +import org.scalatest.{Matchers, WordSpecLike} + +class StatementInstrumentationSpec extends TestKit(ActorSystem("jdbc-spec")) with WordSpecLike with Matchers { + + val connection = DriverManager.getConnection("jdbc:h2:mem:test","SA", "") + + "the StatementInstrumentation" should { + "bblabals" in { + TraceRecorder.withNewTraceContext("jdbc-trace") { + connection should not be null + + val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));" + val createStatement = connection.createStatement() + createStatement.executeUpdate(create) + + val insert = "INSERT INTO Address (Nr, Name) VALUES(1, 'foo')" + val insertStatement = connection.prepareStatement(insert) + insertStatement.execute() + + val select = + """ + |/*this is a comment*/ + |SELECT * FROM Address""".stripMargin + val selectStatement = connection.prepareCall(select) + selectStatement.execute() + + val update = "UPDATE Address SET Name = 'bar' where Nr = 1" + val updateStatement = connection.createStatement() + updateStatement.execute(update) + + val delete = "DELETE FROM Address where Nr = 1" + val deleteStatement = connection.createStatement() + deleteStatement.execute(delete) + + intercept[SQLException] { + val error = "SELECT * FROM NON_EXIST_TABLE" + val errorStatement = connection.createStatement() + errorStatement.execute(error) + } + + } + } + } +} + diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4eec01c1..8ff42314 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -27,6 +27,7 @@ object Dependencies { val aspectjVersion = "1.8.4" val slf4jVersion = "1.7.6" val playVersion = "2.3.5" + val sigarVersion = "1.6.5.132" val sprayJson = "io.spray" %% "spray-json" % "1.3.0" val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.5.4" @@ -50,8 +51,10 @@ object Dependencies { val slf4Api = "org.slf4j" % "slf4j-api" % slf4jVersion val slf4nop = "org.slf4j" % "slf4j-nop" % slf4jVersion val scalaCompiler = "org.scala-lang" % "scala-compiler" % Settings.ScalaVersion + val sigar = "org.fusesource" % "sigar" % "1.6.4" // TODO remove val scalazConcurrent = "org.scalaz" %% "scalaz-concurrent" % "7.1.0" - val sigarLoader = "io.kamon" % "sigar-loader" % "1.6.5-rev001" + val sigarLoader = "io.kamon" + val h2 = "com.h2database" % "sigar-loader" % "1.6.5-rev001" def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") diff --git a/project/Projects.scala b/project/Projects.scala index a284da11..710bdc9f 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -171,6 +171,17 @@ object Projects extends Build { compile(sigarLoader) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) .dependsOn(kamonCore) + +lazy val kamonJdbc = Project("kamon-jdbc", file("kamon-jdbc")) + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings(aspectJSettings: _*) + .settings( + libraryDependencies ++= + test(h2,scalatest, akkaTestKit, slf4Api, slf4nop) ++ + provided(aspectJ)) + .dependsOn(kamonCore) + val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false) } |