diff options
Diffstat (limited to 'kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala')
-rw-r--r-- | kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala new file mode 100644 index 00000000..d169a4c7 --- /dev/null +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -0,0 +1,122 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.jdbc.instrumentation + +import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } + +import kamon.Kamon +import kamon.jdbc.{ JdbcExtension, Jdbc } +import kamon.jdbc.metric.StatementsMetrics +import kamon.trace.{ TraceContext, SegmentCategory } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } +import org.slf4j.LoggerFactory + +import scala.util.control.NonFatal + +@Aspect +class StatementInstrumentation { + + import StatementInstrumentation._ + + @Pointcut("call(* java.sql.Statement.execute*(..)) && args(sql)") + def onExecuteStatement(sql: String): Unit = {} + + @Pointcut("call(* java.sql.Connection.prepareStatement(..)) && args(sql)") + def onExecutePreparedStatement(sql: String): Unit = {} + + @Pointcut("call(* java.sql.Connection.prepareCall(..)) && args(sql)") + def onExecutePreparedCall(sql: String): Unit = {} + + @Around("onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)") + def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { + TraceContext.map { ctx ⇒ + val metricsExtension = Kamon.metrics + val jdbcExtension = Kamon(Jdbc) + implicit val statementRecorder = metricsExtension.register(StatementsMetrics, "jdbc-statements").map(_.recorder) + + sql.replaceAll(CommentPattern, Empty) match { + case SelectStatement(_) ⇒ withSegment(ctx, Select, jdbcExtension)(recordRead(pjp, sql, jdbcExtension)) + case InsertStatement(_) ⇒ withSegment(ctx, Insert, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) + case UpdateStatement(_) ⇒ withSegment(ctx, Update, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) + case DeleteStatement(_) ⇒ withSegment(ctx, Delete, jdbcExtension)(recordWrite(pjp, sql, jdbcExtension)) + case anythingElse ⇒ + log.debug(s"Unable to parse sql [$sql]") + pjp.proceed() + } + } + } getOrElse pjp.proceed() + + def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = { + val start = System.nanoTime() + try thunk finally timeSpent(System.nanoTime() - start) + } + + def withSegment[A](ctx: TraceContext, statement: String, jdbcExtension: JdbcExtension)(thunk: ⇒ A): A = { + val segmentName = jdbcExtension.generateJdbcSegmentName(statement) + val segment = ctx.startSegment(segmentName, SegmentCategory.Database, Jdbc.SegmentLibraryName) + try thunk finally segment.finish() + } + + def recordRead(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒ + statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) + + val timeSpentInMillis = nanos.toMillis(timeSpent) + + if (timeSpentInMillis >= jdbcExtension.slowQueryThreshold) { + statementRecorder.map(stmr ⇒ stmr.slows.increment()) + jdbcExtension.processSlowQuery(sql, timeSpentInMillis) + } + } + } + + def recordWrite(pjp: ProceedingJoinPoint, sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, jdbcExtension)) { timeSpent ⇒ + statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) + } + } +} + +object StatementInstrumentation { + val log = LoggerFactory.getLogger(classOf[StatementInstrumentation]) + + val SelectStatement = "(?i)^\\s*select.*?\\sfrom[\\s\\[]+([^\\]\\s,)(;]*).*".r + val InsertStatement = "(?i)^\\s*insert(?:\\s+ignore)?\\s+into\\s+([^\\s(,;]*).*".r + val UpdateStatement = "(?i)^\\s*update\\s+([^\\s,;]*).*".r + val DeleteStatement = "(?i)^\\s*delete\\s+from\\s+([^\\s,(;]*).*".r + val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / + val Empty = "" + val Statements = "jdbc-statements" + val Select = "Select" + val Insert = "Insert" + val Update = "Update" + val Delete = "Delete" + + implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) { + def proceedWithErrorHandler(sql: String, jdbcExtension: JdbcExtension)(implicit statementRecorder: Option[StatementsMetrics]): Any = { + try { + pjp.proceed() + } catch { + case NonFatal(cause) ⇒ + jdbcExtension.processSqlError(sql, cause) + statementRecorder.map(stmr ⇒ stmr.errors.increment()) + throw cause + } + } + } +} + |