From 20f9920d4704e28ee8aa066d151b522c0d14e166 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Fri, 4 Oct 2013 16:12:29 -0300 Subject: Simplify logging instrumentation and integrate it with kamon-core, kamon-ouw is gone. --- kamon-core/src/main/resources/META-INF/aop.xml | 2 +- kamon-core/src/main/resources/application.conf | 2 +- kamon-core/src/main/resources/logback.xml | 12 +++ kamon-core/src/main/scala/kamon/TraceContext.scala | 8 +- .../ActorLoggingInstrumentation.scala | 32 ++++++ .../SprayServerInstrumentation.scala | 4 +- .../src/main/scala/kamon/trace/UowDirectives.scala | 28 +++++ .../main/scala/test/SimpleRequestProcessor.scala | 14 ++- .../main/scala/kamon/logging/UowActorLogging.scala | 115 --------------------- .../main/scala/kamon/logging/UowDirectives.scala | 30 ------ project/Build.scala | 11 +- project/Dependencies.scala | 3 +- 12 files changed, 91 insertions(+), 170 deletions(-) create mode 100644 kamon-core/src/main/resources/logback.xml create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/UowDirectives.scala delete mode 100644 kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala delete mode 100644 kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 104d7f78..349fc56d 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -14,7 +14,7 @@ - + diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index 1b564f7e..57f67d32 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -4,7 +4,7 @@ akka { log-dead-letters = on #extensions = ["kamon.dashboard.DashboardExtension"] - akka.loggers = ["kamon.newrelic.NewRelicErrorLogger"] + loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] actor { default-dispatcher { diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-core/src/main/resources/logback.xml new file mode 100644 index 00000000..2ae1e3bd --- /dev/null +++ b/kamon-core/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 0dfc1630..155b7760 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -9,13 +9,7 @@ import kamon.newrelic.NewRelicReporting import kamon.trace.UowTracing.Start // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(id: Long, entries: ActorRef, userContext: Option[Any] = None) { - //implicit val timeout = Timeout(30, TimeUnit.SECONDS) - implicit val as = Kamon.actorSystem.dispatcher - - def append(entry: TraceEntry) = entries ! entry - def close = entries ! "Close" // TODO type this thing!. -} +case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) object TraceContext { val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..3cf0c6fc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala @@ -0,0 +1,32 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import kamon.{Tracer, TraceContext} +import org.aspectj.lang.ProceedingJoinPoint +import org.slf4j.MDC + + +@Aspect +class ActorLoggingInstrumentation { + + + @DeclareMixin("akka.event.Logging.LogEvent+") + def traceContextMixin: ContextAware = new ContextAware { + def traceContext: Option[TraceContext] = Tracer.context() + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { + logEvent.traceContext match { + case Some(ctx) => + MDC.put("uow", ctx.uow) + pjp.proceed() + MDC.remove("uow") + + case None => pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 32eabe71..5117e7e7 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -32,7 +32,7 @@ class SprayServerInstrumentation { Tracer.start val discard = openRequest.asInstanceOf[ContextAware].traceContext - Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) + Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) } @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") @@ -42,7 +42,7 @@ class SprayServerInstrumentation { def afterFinishingRequest(openRequest: OpenRequest): Unit = { val original = openRequest.asInstanceOf[ContextAware].traceContext - Tracer.context().map(_.entries ! Finish()) + Tracer.context().map(_.tracer ! Finish()) if(Tracer.context() != original) { println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala new file mode 100644 index 00000000..392f53b8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala @@ -0,0 +1,28 @@ +package kamon.trace + +import spray.routing.directives.BasicDirectives +import spray.routing._ +import kamon.Tracer +import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress + +trait UowDirectives extends BasicDirectives { + def uow: Directive0 = mapRequest { request => + val uowHeader = request.headers.find(_.name == "X-UOW") + + val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) + Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) + + request + } +} + +object UowDirectives { + val uowCounter = new AtomicLong + + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + + def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) + +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala index 5b216b39..7d4cec52 100644 --- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala @@ -6,8 +6,9 @@ import spray.routing.SimpleRoutingApp import akka.util.Timeout import spray.httpx.RequestBuilding import scala.concurrent.{Await, Future} +import kamon.trace.UowDirectives -object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding { +object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { import scala.concurrent.duration._ import spray.client.pipelining._ import akka.pattern.ask @@ -28,11 +29,13 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } } ~ path("reply" / Segment) { reqID => - complete { - if (Tracer.context().isEmpty) - println("ROUTE NO CONTEXT") + uow { + complete { + if (Tracer.context().isEmpty) + println("ROUTE NO CONTEXT") - (replier ? reqID).mapTo[String] + (replier ? reqID).mapTo[String] + } } } ~ path("ok") { @@ -78,6 +81,7 @@ class Replier extends Actor with ActorLogging { if(Tracer.context.isEmpty) log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") + log.info("Processing at the Replier") sender ! anything } } diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala deleted file mode 100644 index 784bd674..00000000 --- a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala +++ /dev/null @@ -1,115 +0,0 @@ -package kamon.logging - -import akka.actor.{ActorSystem, Actor} -import kamon.{Tracer} -import akka.event.{LoggingBus, LogSource, LoggingAdapter} -import akka.event.Logging._ -import akka.event.slf4j.{Logger, SLF4JLogging} -import akka.event.Logging.Info -import akka.event.Logging.Warning -import akka.event.Logging.Error -import akka.event.Logging.Debug -import org.slf4j.MDC -import akka.util.Helpers - -trait UowActorLogging { - this: Actor => - - val log = { - val (str, clazz) = LogSource(this, context.system) - new ExtendedBusLogging(context.system.eventStream, str, clazz) - } -} - -trait UowLogging { - self: Any => - def system: ActorSystem - - val log = { - val (str, clazz) = LogSource(self.getClass, system) - new ExtendedBusLogging(system.eventStream, str, clazz) - } -} - -class ExtendedBusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter { - - import akka.event.Logging._ - - def isErrorEnabled = bus.logLevel >= ErrorLevel - def isWarningEnabled = bus.logLevel >= WarningLevel - def isInfoEnabled = bus.logLevel >= InfoLevel - def isDebugEnabled = bus.logLevel >= DebugLevel - - def currentUow: String = Tracer.context().flatMap(_.userContext).map(_.toString).getOrElse("") - def extras = Map("uow" -> currentUow) - - protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, RichLogEvent(message, extras))) - protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, RichLogEvent(message, extras))) - protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, RichLogEvent(message, extras))) - protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, RichLogEvent(message, extras))) - protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, RichLogEvent(message, extras))) -} - -case class RichLogEvent(message: String, extras: Map[String, Any]) - - - -class ExtendedSlf4jLogger extends Actor with SLF4JLogging { - - val mdcThreadAttributeName = "sourceThread" - val mdcAkkaSourceAttributeName = "akkaSource" - val mdcAkkaTimestamp = "akkaTimestamp" - - def receive = { - - case event @ Error(cause, logSource, logClass, message) ⇒ - withMdc(logSource, event) { - cause match { - case Error.NoCause | null ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else null) } - case cause ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) } - } - } - - case event @ Warning(logSource, logClass, message) ⇒ - withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } } - - case event @ Info(logSource, logClass, message) ⇒ - withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } } - - case event @ Debug(logSource, logClass, message) ⇒ - withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } } - - case InitializeLogger(_) ⇒ - log.info("Slf4jLogger started") - sender ! LoggerInitialized - } - - def withRichEventProcessing(message: Any)(delegate: => Unit): Unit = message match { - case RichLogEvent(event, extras) => { - extras.foreach { case (k, v) => MDC.put(k, v.toString) } - delegate - MDC.clear() - } - case _ => delegate - } - - @inline - final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: ⇒ Unit) { - MDC.put(mdcAkkaSourceAttributeName, logSource) - MDC.put(mdcThreadAttributeName, logEvent.thread.getName) - MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp)) - try logStatement finally { - MDC.remove(mdcAkkaSourceAttributeName) - MDC.remove(mdcThreadAttributeName) - MDC.remove(mdcAkkaTimestamp) - } - } - - /** - * Override this method to provide a differently formatted timestamp - * @param timestamp a "currentTimeMillis"-obtained timestamp - * @return the given timestamp as a UTC String - */ - protected def formatTimestamp(timestamp: Long): String = - Helpers.currentTimeMillisToUTCString(timestamp) -} diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala deleted file mode 100644 index 0b54cedc..00000000 --- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala +++ /dev/null @@ -1,30 +0,0 @@ -package kamon.logging - -import java.util.concurrent.atomic.AtomicLong -import spray.routing.Directive0 -import spray.routing.directives.BasicDirectives -import java.net.InetAddress -import scala.util.Try -import kamon.{Tracer, Kamon} - -trait UowDirectives extends BasicDirectives { - def uow: Directive0 = mapRequest { request => - val uowHeader = request.headers.find(_.name == "X-UOW") - - val generatedUow = uowHeader.map(_.value).orElse(Some(UowDirectives.newUow)) - println("Generated UOW: "+generatedUow) - Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(userContext = generatedUow)) - - - request - } -} - -object UowDirectives { - val uowCounter = new AtomicLong - - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) - -} diff --git a/project/Build.scala b/project/Build.scala index 1e5c9a2f..022ff787 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -8,7 +8,7 @@ object Build extends Build { import Dependencies._ lazy val root = Project("root", file(".")) - .aggregate(kamonCore, kamonUow, kamonDashboard) + .aggregate(kamonCore, kamonDashboard) .settings(basicSettings: _*) .settings( publish := (), @@ -23,16 +23,11 @@ object Build extends Build { .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, aspectJ, aspectJWeaver, metrics, newrelic, sprayJson) ++ - provided(sprayCan, sprayClient, sprayRouting) ++ + compile(akkaActor, akkaAgent, aspectJ, aspectJWeaver, metrics, newrelic, sprayJson, logback, akkaSlf4j) ++ + compile(sprayCan, sprayClient, sprayRouting) ++ test(scalatest, akkaTestKit, sprayTestkit)) //.dependsOn(kamonDashboard) - lazy val kamonUow = Project("kamon-uow", file("kamon-uow")) - .settings(basicSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayRouting)) - .dependsOn(kamonCore) - lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard")) .settings(basicSettings: _*) .settings(libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayRouting, sprayCan, sprayJson)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 66852801..1a49d1d4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,12 +21,13 @@ object Dependencies { val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M6-SNAP22" - val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" + val logback = "ch.qos.logback" % "logback-classic" % "1.0.13" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" val aspectJWeaver = "org.aspectj" % "aspectjweaver" % "1.7.2" val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0" val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0" + def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") def test (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "test") -- cgit v1.2.3