From ff9c796e960a2292393d6c2c351d527c2773bcbb Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 26 Nov 2014 22:22:34 -0300 Subject: + kamon-jdbc: introduce SqlErrorProcessor --- kamon-jdbc/src/main/resources/reference.conf | 3 ++ kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala | 21 ++++++++++- .../instrumentation/StatementInstrumentation.scala | 44 ++++++++++++---------- .../StatementInstrumentationSpec.scala | 9 ++++- 4 files changed, 55 insertions(+), 22 deletions(-) diff --git a/kamon-jdbc/src/main/resources/reference.conf b/kamon-jdbc/src/main/resources/reference.conf index ce114861..bbe98127 100644 --- a/kamon-jdbc/src/main/resources/reference.conf +++ b/kamon-jdbc/src/main/resources/reference.conf @@ -8,6 +8,9 @@ kamon { # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. slow-query-processor = kamon.jdbc.DefaultSlowQueryProcessor + + # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. + sql-error-processor = kamon.jdbc.DefaultSqlErrorProcessor } metrics { diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala index 3199d27b..9de65821 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/Jdbc.scala @@ -27,22 +27,41 @@ object Jdbc extends ExtensionId[JdbcExtension] with ExtensionIdProvider { class JdbcExtension(system: ExtendedActorSystem) extends Kamon.Extension { private val config = system.settings.config.getConfig("kamon.jdbc") + private val slowQueryProcessorClass = config.getString("slow-query-processor") private val slowQueryProcessor: SlowQueryProcessor = system.dynamicAccess.createInstanceFor[SlowQueryProcessor](slowQueryProcessorClass, Nil).get + private val sqlErrorProcessorClass = config.getString("sql-error-processor") + private val sqlErrorProcessor: SqlErrorProcessor = system.dynamicAccess.createInstanceFor[SqlErrorProcessor](sqlErrorProcessorClass, Nil).get + val slowQueryThreshold = config.getDuration("slow-query-threshold", milliseconds) def processSlowQuery(sql: String, executionTime: Long) = slowQueryProcessor.process(sql, executionTime, slowQueryThreshold) + def processSqlError(sql: String, ex: Throwable) = sqlErrorProcessor.process(sql, ex) } trait SlowQueryProcessor { def process(sql: String, executionTime: Long, queryThreshold: Long): Unit } +trait SqlErrorProcessor { + def process(sql: String, ex: Throwable): Unit +} + +class DefaultSqlErrorProcessor extends SqlErrorProcessor { + import org.slf4j.LoggerFactory + + val log = LoggerFactory.getLogger(classOf[DefaultSqlErrorProcessor]) + + override def process(sql: String, cause: Throwable): Unit = { + log.error(s"the query [$sql] failed with exception [${cause.getMessage}]") + } +} + class DefaultSlowQueryProcessor extends SlowQueryProcessor { import org.slf4j.LoggerFactory - val log = LoggerFactory.getLogger("slow-query-processor") + val log = LoggerFactory.getLogger(classOf[DefaultSlowQueryProcessor]) override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { log.warn(s"The query [$sql] took $executionTimeInMillis ms and the slow query threshold is $queryThresholdInMillis ms") diff --git a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala index 494c75ca..41eef716 100644 --- a/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala +++ b/kamon-jdbc/src/main/scala/kamon/jdbc/instrumentation/StatementInstrumentation.scala @@ -15,27 +15,26 @@ package kamon.jdbc.instrumentation -import java.sql.SQLException import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } import akka.actor.ActorSystem import kamon.Kamon import kamon.jdbc.Jdbc import kamon.jdbc.metric.StatementsMetrics -import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsRecorder +import kamon.jdbc.metric.StatementsMetricsGroupFactory.GroupRecorder import kamon.metric.Metrics import kamon.trace.TraceRecorder import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{ AfterThrowing, Around, Aspect, Pointcut } +import org.aspectj.lang.annotation.{ Around, Aspect, Pointcut } import org.slf4j.LoggerFactory +import scala.util.control.NonFatal + @Aspect class StatementInstrumentation { import StatementInstrumentation._ - @volatile var statementRecorder: Option[StatementsMetricsRecorder] = Option.empty - @Pointcut("call(* java.sql.Statement.execute*(..)) && args(sql)") def onExecuteStatement(sql: String): Unit = {} @@ -49,13 +48,11 @@ class StatementInstrumentation { def aroundExecuteStatement(pjp: ProceedingJoinPoint, sql: String): Any = { TraceRecorder.withTraceContextAndSystem { (ctx, system) ⇒ - if (statementRecorder.isEmpty) { - statementRecorder = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) - } + implicit val statementRecorder: Option[GroupRecorder] = Kamon(Metrics)(system).register(StatementsMetrics(Statements), StatementsMetrics.Factory) sql.replaceAll(CommentPattern, Empty) match { - case SelectStatement(_) ⇒ recordRead(pjp, sql)(system) - case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp) + case SelectStatement(_) ⇒ recordRead(pjp, sql, system) + case InsertStatement(_) | UpdateStatement(_) | DeleteStatement(_) ⇒ recordWrite(pjp, sql, system) case anythingElse ⇒ log.debug(s"Unable to parse sql [$sql]") pjp.proceed() @@ -63,19 +60,13 @@ class StatementInstrumentation { } } getOrElse pjp.proceed() - @AfterThrowing(pointcut = "onExecuteStatement(sql) || onExecutePreparedStatement(sql) || onExecutePreparedCall(sql)", throwing = "ex") - def onError(sql: String, ex: SQLException): Unit = { - log.error(s"the query [$sql] failed with exception [${ex.getMessage}]") - statementRecorder.map(stmr ⇒ stmr.errors.increment()) - } - def withTimeSpent[A](thunk: ⇒ A)(timeSpent: Long ⇒ Unit): A = { val start = System.nanoTime() try thunk finally timeSpent(System.nanoTime() - start) } - def recordRead(pjp: ProceedingJoinPoint, sql: String)(system: ActorSystem): Any = { - withTimeSpent(pjp.proceed()) { timeSpent ⇒ + def recordRead(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.reads.record(timeSpent)) val timeSpentInMillis = nanos.toMillis(timeSpent) @@ -87,8 +78,8 @@ class StatementInstrumentation { } } - def recordWrite(pjp: ProceedingJoinPoint): Any = { - withTimeSpent(pjp.proceed()) { timeSpent ⇒ + def recordWrite(pjp: ProceedingJoinPoint, sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + withTimeSpent(pjp.proceedWithErrorHandler(sql, system)) { timeSpent ⇒ statementRecorder.map(stmr ⇒ stmr.writes.record(timeSpent)) } } @@ -104,5 +95,18 @@ object StatementInstrumentation { val CommentPattern = "/\\*.*?\\*/" //for now only removes comments of kind / * anything * / val Empty = "" val Statements = "jdbc-statements" + + implicit class PimpedProceedingJoinPoint(pjp: ProceedingJoinPoint) { + def proceedWithErrorHandler(sql: String, system: ActorSystem)(implicit statementRecorder: Option[GroupRecorder]): Any = { + try { + pjp.proceed() + } catch { + case NonFatal(cause) ⇒ + Jdbc(system).processSqlError(sql, cause) + statementRecorder.map(stmr ⇒ stmr.errors.increment()) + throw cause + } + } + } } diff --git a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala index ffd5a867..4b400727 100644 --- a/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala +++ b/kamon-jdbc/src/test/scala/kamon/jdbc/instrumentation/StatementInstrumentationSpec.scala @@ -21,7 +21,7 @@ import akka.actor.ActorSystem import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.jdbc.SlowQueryProcessor +import kamon.jdbc.{ SqlErrorProcessor, SlowQueryProcessor } import kamon.jdbc.metric.StatementsMetrics import kamon.jdbc.metric.StatementsMetrics.StatementsMetricsSnapshot import kamon.metric.Metrics @@ -41,6 +41,9 @@ class StatementInstrumentationSpec extends TestKitBase with WordSpecLike with Ma | | # Fully qualified name of the implementation of kamon.jdbc.SlowQueryProcessor. | slow-query-processor = kamon.jdbc.instrumentation.NOPSlowQueryProcessor + | + | # Fully qualified name of the implementation of kamon.jdbc.SqlErrorProcessor. + | sql-error-processor = kamon.jdbc.instrumentation.NOPSqlErrorProcessor | } |} """.stripMargin)) @@ -182,3 +185,7 @@ class NOPSlowQueryProcessor extends SlowQueryProcessor { override def process(sql: String, executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { /*do nothing!!!*/ } } +class NOPSqlErrorProcessor extends SqlErrorProcessor { + override def process(sql: String, ex: Throwable): Unit = { /*do nothing!!!*/ } +} + -- cgit v1.2.3