aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-jdbc/src/main/resources/META-INF/aop.xml12
-rw-r--r--kamon-jdbc/src/main/resources/reference.conf23
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala35
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala104
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala83
-rw-r--r--kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala67
-rw-r--r--project/Dependencies.scala5
-rw-r--r--project/Projects.scala11
8 files changed, 339 insertions, 1 deletions
diff --git a/kamon-jdbc/src/main/resources/META-INF/aop.xml b/kamon-jdbc/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..d9ac097f
--- /dev/null
+++ b/kamon-jdbc/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,12 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <aspects>
+ <aspect name="kamon.jdbc.instrumentation.StatementInstrumentation"/>
+ </aspects>
+
+ <weaver options="-verbose">
+ <include within="java.sql.Statement+..*"/>
+ <include within="java.sql.Connection+..*"/>
+ </weaver>
+</aspectj>
diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf
new file mode 100644
index 00000000..ce114861
--- /dev/null
+++ b/kamon-jdbc/src/main/resources/reference.conf
@@ -0,0 +1,23 @@
+# ================================== #
+# Kamon-jdbc Reference Configuration #
+# ================================== #
+
+kamon {
+ jdbc {
+ slow-query-threshold = 2 seconds
+
+ # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor.
+ slow-query-processor = kamon.jdbc.DefaultSlowQueryProcessor
+ }
+
+ metrics {
+ precision {
+ jdbc {
+ statements {
+ reads = ${kamon.metrics.precision.default-histogram-precision}
+ writes = ${kamon.metrics.precision.default-histogram-precision}
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
new file mode 100644
index 00000000..7442fb96
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala
@@ -0,0 +1,35 @@
+package kamon.jdbc
+
+import java.util.concurrent.TimeUnit.{ MILLISECONDS ⇒ milliseconds }
+
+import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
+import kamon.Kamon
+
+object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = Jdbc
+ override def createExtension(system: ExtendedActorSystem): JdbcExtension = new JdbcExtension(system)
+}
+
+class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ private val config = system.settings.config.getConfig("kamon.jdbc")
+ private val slowQueryProcessorClass = config.getString("slow-query-processor")
+ private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get
+
+ val slowQueryThreshold = config.getDuration("slow-query-threshold", milliseconds)
+
+ def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold)
+}
+
+trait SlowQueryProcessor {
+ def process(sql: String, executionTime: Long, queryThreshold: Long): Unit
+}
+
+class DefaultSlowQueryProcessor extends SlowQueryProcessor {
+ import org.slf4j.LoggerFactory
+
+ val log = LoggerFactory.getLogger("slow-query-processor")
+
+ override def process(sql: String, executionTime: Long, queryThreshold: Long): Unit = {
+ log.warn(s"The query: $sql took ${executionTime}ms and the slow query threshold is ${queryThreshold}ms")
+ }
+}
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
new file mode 100644
index 00000000..faa068aa
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -0,0 +1,104 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.instrumentation
+
+import java.sql.SQLException
+
+import akka.actor.ActorSystem
+import kamon.Kamon
+import kamon.jdbc.Jdbc
+import kamon.jdbc.metric.StatementsMetrics
+import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsRecorder
+import kamon.metric.Metrics
+import kamon.trace.TraceRecorder
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.{ AfterThrowing, Around, Aspect, Pointcut }
+import org.slf4j.LoggerFactory
+
+@Aspect
+class StatementInstrumentation {
+
+ import StatementInstrumentation._
+
+ @volatile var statementRecorder: Option[StatementsMetricsRecorder] = Option.empty
+
+ @Pointcut("execution(* java.sql.Statement.execute*(..)) && args(sql)")
+ def onExecuteStatement(sql: String): Unit = {}
+
+ @Pointcut("execution(* java.sql.Connection.prepareStatement(..)) && args(sql)")
+ def onExecutePreparedStatement(sql: String): Unit = {}
+
+ @Pointcut("execution(* java.sql.Connection.prepareCall(..)) && args(sql)")
+ def onExecutePreparedCall(sql: String): Unit = {}
+
+ @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)")
+ def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = {
+ TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒
+
+ if (statementRecorder.nonEmpty) {
+ statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics("Statements"), StatementsMetrics.Factory)
+ }
+
+ sql.replaceAll(CommentPattern, "") match {
+ case SelectStatement(_) ⇒ recordRead(pjp, sql)(system)
+ case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp)
+ case anythingElse ⇒
+ log.debug(s"Unable to parse sql [$sql]")
+ pjp.proceed()
+ }
+ } getOrElse pjp.proceed()
+ }
+
+ @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex")
+ def onError(sql: String, ex:SQLException): Unit = {
+ log.error(s"the query [$sql] failed with exception [${ex.getMessage}]")
+ statementRecorder.map(stmr ⇒ stmr.errors.increment())
+ }
+
+ def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = {
+ val start = System.nanoTime()
+ try thunk finally timeSpent(System.nanoTime() - start)
+ }
+
+ def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = {
+ withTimeSpent(pjp.proceed()) {
+ timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
+
+ if (timeSpent >= Jdbc(system).slowQueryThreshold) {
+ statementRecorder.map(stmr ⇒ stmr.slow.increment())
+ Jdbc(system).processSlowQuery(sql, timeSpent)
+ }
+ }
+ }
+
+ def recordWrite(pjp: ProceedingJoinPoint): Any = {
+ withTimeSpent(pjp.proceed()) {
+ timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
+ }
+ }
+}
+
+object StatementInstrumentation {
+ val log = LoggerFactory.getLogger("StatementInstrumentation")
+
+ val SelectStatement = "(?i)^\\s*select.*?\\sfrom[\\s\\[]+([^\\]\\s,)(;]*).*".r
+ val InsertStatement = "(?i)^\\s*insert(?:\\s+ignore)?\\s+into\\s+([^\\s(,;]*).*".r
+ val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r
+ val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r
+ val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * /
+}
+
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
new file mode 100644
index 00000000..bc52c249
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/metric/StatementsMetrics.scala
@@ -0,0 +1,83 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.{ CollectionContext, MetricGroupCategory, MetricGroupFactory, MetricGroupIdentity, MetricGroupRecorder, MetricGroupSnapshot, MetricIdentity, MetricSnapshot }
+import kamon.metric.instrument.{ Counter, Histogram }
+
+case class StatementsMetrics(name: String) extends MetricGroupIdentity {
+ val category = StatementsMetrics
+}
+
+object StatementsMetrics extends MetricGroupCategory {
+ val name = "jdbc-statements"
+
+ case object Writes extends MetricIdentity { val name = "writes" }
+ case object Reads extends MetricIdentity { val name = "reads" }
+ case object Slows extends MetricIdentity { val name = "slow-queries" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class StatementsMetricsRecorder(writes: Histogram, reads: Histogram, slow: Counter, errors: Counter)
+ extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): MetricGroupSnapshot = {
+ StatementsMetricsSnapshot(writes.collect(context), reads.collect(context), slow.collect(context), errors.collect(context))
+ }
+
+ def cleanup: Unit = {}
+ }
+
+ case class StatementsMetricsSnapshot(writes: Histogram.Snapshot, reads: Histogram.Snapshot, slow: Counter.Snapshot, errors: Counter.Snapshot)
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = StatementsMetricsSnapshot
+
+ def merge(that: StatementsMetricsSnapshot, context: CollectionContext): GroupSnapshotType = {
+ StatementsMetricsSnapshot(writes.merge(that.writes, context), reads.merge(that.reads, context), slow.merge(that.slow, context), errors.merge(that.errors, context))
+ }
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ Writes -> writes,
+ Reads -> reads,
+ Slows -> slow,
+ Reads -> errors)
+ }
+
+ val Factory = StatementsMetricsGroupFactory
+}
+
+case object StatementsMetricsGroupFactory extends MetricGroupFactory {
+ import kamon.jdbc.metric.StatementsMetrics._
+
+ type GroupRecorder = StatementsMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): GroupRecorder = {
+ val settings = config.getConfig("precision.jdbc.statements")
+
+ val writesConfig = settings.getConfig("writes")
+ val readsConfig = settings.getConfig("reads")
+
+ new StatementsMetricsRecorder(
+ Histogram.fromConfig(writesConfig),
+ Histogram.fromConfig(readsConfig),
+ Counter(),
+ Counter())
+ }
+}
+
diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
new file mode 100644
index 00000000..ec8ca4a1
--- /dev/null
+++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala
@@ -0,0 +1,67 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.instrumentation
+
+import java.sql.{SQLException, DriverManager}
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import kamon.trace.TraceRecorder
+import org.scalatest.{Matchers, WordSpecLike}
+
+class StatementInstrumentationSpec extends TestKit(ActorSystem("jdbc-spec")) with WordSpecLike with Matchers {
+
+ val connection = DriverManager.getConnection("jdbc:h2:mem:test","SA", "")
+
+ "the StatementInstrumentation" should {
+ "bblabals" in {
+ TraceRecorder.withNewTraceContext("jdbc-trace") {
+ connection should not be null
+
+ val create = "CREATE TABLE Address (Nr INTEGER, Name VARCHAR(128));"
+ val createStatement = connection.createStatement()
+ createStatement.executeUpdate(create)
+
+ val insert = "INSERT INTO Address (Nr, Name) VALUES(1, 'foo')"
+ val insertStatement = connection.prepareStatement(insert)
+ insertStatement.execute()
+
+ val select =
+ """
+ |/*this is a comment*/
+ |SELECT * FROM Address""".stripMargin
+ val selectStatement = connection.prepareCall(select)
+ selectStatement.execute()
+
+ val update = "UPDATE Address SET Name = 'bar' where Nr = 1"
+ val updateStatement = connection.createStatement()
+ updateStatement.execute(update)
+
+ val delete = "DELETE FROM Address where Nr = 1"
+ val deleteStatement = connection.createStatement()
+ deleteStatement.execute(delete)
+
+ intercept[SQLException] {
+ val error = "SELECT * FROM NON_EXIST_TABLE"
+ val errorStatement = connection.createStatement()
+ errorStatement.execute(error)
+ }
+
+ }
+ }
+ }
+}
+
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 4eec01c1..8ff42314 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -27,6 +27,7 @@ object Dependencies {
val aspectjVersion = "1.8.4"
val slf4jVersion = "1.7.6"
val playVersion = "2.3.5"
+ val sigarVersion = "1.6.5.132"
val sprayJson = "io.spray" %% "spray-json" % "1.3.0"
val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.5.4"
@@ -50,8 +51,10 @@ object Dependencies {
val slf4Api = "org.slf4j" % "slf4j-api" % slf4jVersion
val slf4nop = "org.slf4j" % "slf4j-nop" % slf4jVersion
val scalaCompiler = "org.scala-lang" % "scala-compiler" % Settings.ScalaVersion
+ val sigar = "org.fusesource" % "sigar" % "1.6.4" // TODO remove
val scalazConcurrent = "org.scalaz" %% "scalaz-concurrent" % "7.1.0"
- val sigarLoader = "io.kamon" % "sigar-loader" % "1.6.5-rev001"
+ val sigarLoader = "io.kamon"
+ val h2 = "com.h2database" % "sigar-loader" % "1.6.5-rev001"
def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile")
def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided")
diff --git a/project/Projects.scala b/project/Projects.scala
index a284da11..710bdc9f 100644
--- a/project/Projects.scala
+++ b/project/Projects.scala
@@ -171,6 +171,17 @@ object Projects extends Build {
compile(sigarLoader) ++
test(scalatest, akkaTestKit, slf4Api, slf4nop))
.dependsOn(kamonCore)
+
+lazy val kamonJdbc = Project("kamon-jdbc", file("kamon-jdbc"))
+ .settings(basicSettings: _*)
+ .settings(formatSettings: _*)
+ .settings(aspectJSettings: _*)
+ .settings(
+ libraryDependencies ++=
+ test(h2,scalatest, akkaTestKit, slf4Api, slf4nop) ++
+ provided(aspectJ))
+ .dependsOn(kamonCore)
+
val noPublishing = Seq(publish := (), publishLocal := (), publishArtifact := false)
}