diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-26 19:24:56 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-26 19:24:56 -0300 |
commit | 902da6b5410325411a0473f923632fa92d39838e (patch) | |
tree | 498b0eff9e57b5cc5ff60558fa6092c84736a8bd | |
parent | b03b74291b60e9882e815d7823221eacbbf3cfca (diff) | |
download | Kamon-902da6b5410325411a0473f923632fa92d39838e.tar.gz Kamon-902da6b5410325411a0473f923632fa92d39838e.tar.bz2 Kamon-902da6b5410325411a0473f923632fa92d39838e.zip |
Some sort of basic logging with UOW
8 files changed, 116 insertions, 20 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 0f1895ec..0f427611 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -19,7 +19,7 @@ <!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>--> <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> - <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/> + <!--<aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>--> diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 212eab2c..7d3e36ca 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -16,16 +16,18 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ + val t2 = Metrics.registry.timer("some" + "LATENCY") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(actor) - val t = Metrics.registry.timer(actorName + "LATENCY") + //val actorName = MetricDirectory.nameForActor(actor) + //val t = Metrics.registry.timer(actorName + "LATENCY") //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) } } @@ -66,7 +68,7 @@ class ActorCellInvokeInstrumentation { //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - timer.stop() + //timer.stop() val originalEnvelope = envelope.copy(message = msg) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index c21502ac..6a1e291f 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -48,12 +48,12 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def enqueue(receiver: ActorRef, handle: Envelope) = { delegate.enqueue(receiver, handle) - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) } def dequeue(): Envelope = { val envelope = delegate.dequeue() - queueSizeHistogram.update(numberOfMessages) + //queueSizeHistogram.update(numberOfMessages) envelope } diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala index 75dcf36c..904329f2 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala @@ -81,7 +81,7 @@ trait DashboardMetricsApi extends HttpService with SprayJsonSupport{ new TotalMessages(numberOfMessages,dataHolders.size, dataHolders) } - def timerMetrics = registry.getTimers(metricFilter).asScala.map{ case(name, timer) => TimerDataHolder(name, timer.getCount, timer.getSnapshot.get99thPercentile())}.toList + def timerMetrics = registry.getTimers(metricFilter).asScala.map{ case(name, timer) => TimerDataHolder(name, timer.getSnapshot.getMean.toLong, timer.getSnapshot.get99thPercentile())}.toList val dashboardMetricsApi = pathPrefix("metrics") { diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala index 64c5241a..784bd674 100644 --- a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala +++ b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala @@ -1,24 +1,115 @@ package kamon.logging -import akka.actor.Actor -import kamon.{Tracer, Kamon} -import akka.event.{BusLogging, LogSource, LoggingAdapter} +import akka.actor.{ActorSystem, Actor} +import kamon.{Tracer} +import akka.event.{LoggingBus, LogSource, LoggingAdapter} +import akka.event.Logging._ +import akka.event.slf4j.{Logger, SLF4JLogging} +import akka.event.Logging.Info +import akka.event.Logging.Warning +import akka.event.Logging.Error +import akka.event.Logging.Debug +import org.slf4j.MDC +import akka.util.Helpers trait UowActorLogging { this: Actor => val log = { val (str, clazz) = LogSource(this, context.system) - new BusLogging(context.system.eventStream, str, clazz) with UowLoggingAdapter + new ExtendedBusLogging(context.system.eventStream, str, clazz) } +} + +trait UowLogging { + self: Any => + def system: ActorSystem + val log = { + val (str, clazz) = LogSource(self.getClass, system) + new ExtendedBusLogging(system.eventStream, str, clazz) + } } -trait UowLoggingAdapter extends LoggingAdapter { +class ExtendedBusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter { + + import akka.event.Logging._ + + def isErrorEnabled = bus.logLevel >= ErrorLevel + def isWarningEnabled = bus.logLevel >= WarningLevel + def isInfoEnabled = bus.logLevel >= InfoLevel + def isDebugEnabled = bus.logLevel >= DebugLevel + def currentUow: String = Tracer.context().flatMap(_.userContext).map(_.toString).getOrElse("") - protected abstract override def notifyWarning(message: String) = super.notifyWarning(s"[${currentUow}] - $message") - protected abstract override def notifyInfo(message: String) = super.notifyInfo( s"[${currentUow}] - $message") - protected abstract override def notifyDebug(message: String) = super.notifyDebug( s"[${currentUow}] - $message") - protected abstract override def notifyError(message: String) = super.notifyError( s"[${currentUow}] - $message") - protected abstract override def notifyError(cause: Throwable, message: String) = super.notifyError(cause, s"[${currentUow}] - $message") + def extras = Map("uow" -> currentUow) + + protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, RichLogEvent(message, extras))) + protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, RichLogEvent(message, extras))) + protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, RichLogEvent(message, extras))) + protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, RichLogEvent(message, extras))) + protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, RichLogEvent(message, extras))) +} + +case class RichLogEvent(message: String, extras: Map[String, Any]) + + + +class ExtendedSlf4jLogger extends Actor with SLF4JLogging { + + val mdcThreadAttributeName = "sourceThread" + val mdcAkkaSourceAttributeName = "akkaSource" + val mdcAkkaTimestamp = "akkaTimestamp" + + def receive = { + + case event @ Error(cause, logSource, logClass, message) ⇒ + withMdc(logSource, event) { + cause match { + case Error.NoCause | null ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else null) } + case cause ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) } + } + } + + case event @ Warning(logSource, logClass, message) ⇒ + withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } } + + case event @ Info(logSource, logClass, message) ⇒ + withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } } + + case event @ Debug(logSource, logClass, message) ⇒ + withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } } + + case InitializeLogger(_) ⇒ + log.info("Slf4jLogger started") + sender ! LoggerInitialized + } + + def withRichEventProcessing(message: Any)(delegate: => Unit): Unit = message match { + case RichLogEvent(event, extras) => { + extras.foreach { case (k, v) => MDC.put(k, v.toString) } + delegate + MDC.clear() + } + case _ => delegate + } + + @inline + final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: ⇒ Unit) { + MDC.put(mdcAkkaSourceAttributeName, logSource) + MDC.put(mdcThreadAttributeName, logEvent.thread.getName) + MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp)) + try logStatement finally { + MDC.remove(mdcAkkaSourceAttributeName) + MDC.remove(mdcThreadAttributeName) + MDC.remove(mdcAkkaTimestamp) + } + } + + /** + * Override this method to provide a differently formatted timestamp + * @param timestamp a "currentTimeMillis"-obtained timestamp + * @return the given timestamp as a UTC String + */ + protected def formatTimestamp(timestamp: Long): String = + Helpers.currentTimeMillisToUTCString(timestamp) } diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala index 58473f13..6d38f3d7 100644 --- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala +++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala @@ -9,7 +9,9 @@ import kamon.{Tracer, Kamon} trait UowDirectives extends BasicDirectives { def uow: Directive0 = mapRequest { request => - val generatedUow = Some(UowDirectives.newUow) + val uowHeader = request.headers.find(_.name == "X-UOW") + + val generatedUow = uowHeader.map(_.value).orElse(Some(UowDirectives.newUow)) println("Generated UOW: "+generatedUow) Tracer.set(Tracer.newTraceContext().copy(userContext = generatedUow)) diff --git a/project/Build.scala b/project/Build.scala index 879cc1c0..0141540b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -19,7 +19,7 @@ object Build extends Build { .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++ + compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson) ++ test(scalatest, akkaTestKit, sprayTestkit)) //.dependsOn(kamonDashboard) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 028004e1..b61f0979 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,6 +24,7 @@ object Dependencies { val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M6-SNAP22" val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" + val aspectJWeaver = "org.aspectj" % "aspectjweaver" % "1.7.2" val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0" val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0" |