diff options
Diffstat (limited to 'kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation')
-rw-r--r-- | kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala | 44 |
1 files changed, 24 insertions, 20 deletions
diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala index 494c75ca..41eef716 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -15,27 +15,26 @@ package kamon.jdbc.instrumentation -import java.sql.SQLException import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } import akka.actor.ActorSystem import kamon.Kamon import kamon.jdbc.Jdbc import kamon.jdbc.metric.StatementsMetrics -import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsRecorder +import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder import kamon.metric.Metrics import kamon.trace.TraceRecorder import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{ AfterThrowing, Around, Aspect, Pointcut } +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import org.slf4j.LoggerFactory +import scala.util.control.NonFatal + @Aspect class StatementInstrumentation { import StatementInstrumentation._ - @volatile var statementRecorder: Option[StatementsMetricsRecorder] = Option.empty - @Pointcut("call(* java.sql.Statement.execute*(..)) && args(sql)") def onExecuteStatement(sql: String): Unit = {} @@ -49,13 +48,11 @@ class StatementInstrumentation { def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - if (statementRecorder.isEmpty) { - statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) - } + implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) sql.replaceAll(CommentPattern, Empty) match { - case SelectStatement(_) ⇒ recordRead(pjp, sql)(system) - case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp) + case SelectStatement(_) ⇒ recordRead(pjp, sql, system) + case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp, sql, system) case anythingElse ⇒ log.debug(s"Unable to parse sql [$sql]") pjp.proceed() @@ -63,19 +60,13 @@ class StatementInstrumentation { } } getOrElse pjp.proceed() - @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex") - def onError(sql: String, ex: SQLException): Unit = { - log.error(s"the query [$sql] failed with exception [${ex.getMessage}]") - statementRecorder.map(stmr ⇒ stmr.errors.increment()) - } - def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = { val start = System.nanoTime() try thunk finally timeSpent(System.nanoTime() - start) } - def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = { - withTimeSpent(pjp.proceed()) { timeSpent ⇒ + def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) val timeSpentInMillis = nanos.toMillis(timeSpent) @@ -87,8 +78,8 @@ class StatementInstrumentation { } } - def recordWrite(pjp: ProceedingJoinPoint): Any = { - withTimeSpent(pjp.proceed()) { timeSpent ⇒ + def recordWrite(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) } } @@ -104,5 +95,18 @@ object StatementInstrumentation { val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / val Empty = "" val Statements = "jdbc-statements" + + implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) { + def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + try { + pjp.proceed() + } catch { + case NonFatal(cause) ⇒ + Jdbc(system).processSqlError(sql, cause) + statementRecorder.map(stmr ⇒ stmr.errors.increment()) + throw cause + } + } + } } |