aboutsummaryrefslogtreecommitdiff
path: root/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala')
-rw-r--r--kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala122
1 files changed, 122 insertions, 0 deletions
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
new file mode 100644
index 00000000..d169a4c7
--- /dev/null
+++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala
@@ -0,0 +1,122 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.jdbc.instrumentation
+
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos }
+
+import kamon.Kamon
+import kamon.jdbc.{ JdbcExtension, Jdbc }
+import kamon.jdbc.metric.StatementsMetrics
+import kamon.trace.{ TraceContext, SegmentCategory }
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut }
+import org.slf4j.LoggerFactory
+
+import scala.util.control.NonFatal
+
+@Aspect
+class StatementInstrumentation {
+
+ import StatementInstrumentation._
+
+ @Pointcut("call(* java.sql.Statement.execute*(..)) && args(sql)")
+ def onExecuteStatement(sql: String): Unit = {}
+
+ @Pointcut("call(* java.sql.Connection.prepareStatement(..)) && args(sql)")
+ def onExecutePreparedStatement(sql: String): Unit = {}
+
+ @Pointcut("call(* java.sql.Connection.prepareCall(..)) && args(sql)")
+ def onExecutePreparedCall(sql: String): Unit = {}
+
+ @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)")
+ def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = {
+ TraceContext.map { ctx ⇒
+ val metricsExtension = Kamon.metrics
+ val jdbcExtension = Kamon(Jdbc)
+ implicit val statementRecorder = metricsExtension.register(StatementsMetrics, "jdbc-statements").map(_.recorder)
+
+ sql.replaceAll(CommentPattern, Empty) match {
+ case SelectStatement(_) ⇒ withSegment(ctx, Select, jdbcExtension)(recordRead(pjp, sql, jdbcExtension))
+ case InsertStatement(_) ⇒ withSegment(ctx, Insert, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
+ case UpdateStatement(_) ⇒ withSegment(ctx, Update, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
+ case DeleteStatement(_) ⇒ withSegment(ctx, Delete, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension))
+ case anythingElse ⇒
+ log.debug(s"Unable to parse sql [$sql]")
+ pjp.proceed()
+ }
+ }
+ } getOrElse pjp.proceed()
+
+ def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = {
+ val start = System.nanoTime()
+ try thunk finally timeSpent(System.nanoTime() - start)
+ }
+
+ def withSegment[A](ctx: TraceContext, statement: String, jdbcExtension: JdbcExtension)(thunk: ⇒ A): A = {
+ val segmentName = jdbcExtension.generateJdbcSegmentName(statement)
+ val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName)
+ try thunk finally segment.finish()
+ }
+
+ def recordRead(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
+ withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent))
+
+ val timeSpentInMillis = nanos.toMillis(timeSpent)
+
+ if (timeSpentInMillis >= jdbcExtension.slowQueryThreshold) {
+ statementRecorder.map(stmr ⇒ stmr.slows.increment())
+ jdbcExtension.processSlowQuery(sql, timeSpentInMillis)
+ }
+ }
+ }
+
+ def recordWrite(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
+ withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒
+ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent))
+ }
+ }
+}
+
+object StatementInstrumentation {
+ val log = LoggerFactory.getLogger(classOf[StatementInstrumentation])
+
+ val SelectStatement = "(?i)^\\s*select.*?\\sfrom[\\s\\[]+([^\\]\\s,)(;]*).*".r
+ val InsertStatement = "(?i)^\\s*insert(?:\\s+ignore)?\\s+into\\s+([^\\s(,;]*).*".r
+ val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r
+ val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r
+ val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * /
+ val Empty = ""
+ val Statements = "jdbc-statements"
+ val Select = "Select"
+ val Insert = "Insert"
+ val Update = "Update"
+ val Delete = "Delete"
+
+ implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) {
+ def proceedWithErrorHandler(sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = {
+ try {
+ pjp.proceed()
+ } catch {
+ case NonFatal(cause) ⇒
+ jdbcExtension.processSqlError(sql, cause)
+ statementRecorder.map(stmr ⇒ stmr.errors.increment())
+ throw cause
+ }
+ }
+ }
+}
+