aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-26 19:24:56 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-26 19:24:56 -0300
commit902da6b5410325411a0473f923632fa92d39838e (patch)
tree498b0eff9e57b5cc5ff60558fa6092c84736a8bd
parentb03b74291b60e9882e815d7823221eacbbf3cfca (diff)
downloadKamon-902da6b5410325411a0473f923632fa92d39838e.tar.gz
Kamon-902da6b5410325411a0473f923632fa92d39838e.tar.bz2
Kamon-902da6b5410325411a0473f923632fa92d39838e.zip
Some sort of basic logging with UOW
-rw-r--r--kamon-core/src/main/resources/META-INF/aop.xml2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala4
-rw-r--r--kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala2
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala111
-rw-r--r--kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala4
-rw-r--r--project/Build.scala2
-rw-r--r--project/Dependencies.scala1
8 files changed, 116 insertions, 20 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml
index 0f1895ec..0f427611 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop.xml
@@ -19,7 +19,7 @@
<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/>
<aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>-->
<aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
- <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>
+ <!--<aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>-->
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 212eab2c..7d3e36ca 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -16,16 +16,18 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
+ val t2 = Metrics.registry.timer("some" + "LATENCY")
+
@Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(actor)
- val t = Metrics.registry.timer(actorName + "LATENCY")
+ //val actorName = MetricDirectory.nameForActor(actor)
+ //val t = Metrics.registry.timer(actorName + "LATENCY")
//println(s"Wrapped message from [$sender] to [$actor] with content: [$message]")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)
+ pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender)
}
}
@@ -66,7 +68,7 @@ class ActorCellInvokeInstrumentation {
//println("ENVELOPE --------------------->"+envelope)
envelope match {
case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- timer.stop()
+ //timer.stop()
val originalEnvelope = envelope.copy(message = msg)
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
index c21502ac..6a1e291f 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -48,12 +48,12 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram:
def enqueue(receiver: ActorRef, handle: Envelope) = {
delegate.enqueue(receiver, handle)
- queueSizeHistogram.update(numberOfMessages)
+ //queueSizeHistogram.update(numberOfMessages)
}
def dequeue(): Envelope = {
val envelope = delegate.dequeue()
- queueSizeHistogram.update(numberOfMessages)
+ //queueSizeHistogram.update(numberOfMessages)
envelope
}
diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala
index 75dcf36c..904329f2 100644
--- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala
+++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala
@@ -81,7 +81,7 @@ trait DashboardMetricsApi extends HttpService with SprayJsonSupport{
new TotalMessages(numberOfMessages,dataHolders.size, dataHolders)
}
- def timerMetrics = registry.getTimers(metricFilter).asScala.map{ case(name, timer) => TimerDataHolder(name, timer.getCount, timer.getSnapshot.get99thPercentile())}.toList
+ def timerMetrics = registry.getTimers(metricFilter).asScala.map{ case(name, timer) => TimerDataHolder(name, timer.getSnapshot.getMean.toLong, timer.getSnapshot.get99thPercentile())}.toList
val dashboardMetricsApi =
pathPrefix("metrics") {
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
index 64c5241a..784bd674 100644
--- a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
+++ b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala
@@ -1,24 +1,115 @@
package kamon.logging
-import akka.actor.Actor
-import kamon.{Tracer, Kamon}
-import akka.event.{BusLogging, LogSource, LoggingAdapter}
+import akka.actor.{ActorSystem, Actor}
+import kamon.{Tracer}
+import akka.event.{LoggingBus, LogSource, LoggingAdapter}
+import akka.event.Logging._
+import akka.event.slf4j.{Logger, SLF4JLogging}
+import akka.event.Logging.Info
+import akka.event.Logging.Warning
+import akka.event.Logging.Error
+import akka.event.Logging.Debug
+import org.slf4j.MDC
+import akka.util.Helpers
trait UowActorLogging {
this: Actor =>
val log = {
val (str, clazz) = LogSource(this, context.system)
- new BusLogging(context.system.eventStream, str, clazz) with UowLoggingAdapter
+ new ExtendedBusLogging(context.system.eventStream, str, clazz)
}
+}
+
+trait UowLogging {
+ self: Any =>
+ def system: ActorSystem
+ val log = {
+ val (str, clazz) = LogSource(self.getClass, system)
+ new ExtendedBusLogging(system.eventStream, str, clazz)
+ }
}
-trait UowLoggingAdapter extends LoggingAdapter {
+class ExtendedBusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter {
+
+ import akka.event.Logging._
+
+ def isErrorEnabled = bus.logLevel >= ErrorLevel
+ def isWarningEnabled = bus.logLevel >= WarningLevel
+ def isInfoEnabled = bus.logLevel >= InfoLevel
+ def isDebugEnabled = bus.logLevel >= DebugLevel
+
def currentUow: String = Tracer.context().flatMap(_.userContext).map(_.toString).getOrElse("")
- protected abstract override def notifyWarning(message: String) = super.notifyWarning(s"[${currentUow}] - $message")
- protected abstract override def notifyInfo(message: String) = super.notifyInfo( s"[${currentUow}] - $message")
- protected abstract override def notifyDebug(message: String) = super.notifyDebug( s"[${currentUow}] - $message")
- protected abstract override def notifyError(message: String) = super.notifyError( s"[${currentUow}] - $message")
- protected abstract override def notifyError(cause: Throwable, message: String) = super.notifyError(cause, s"[${currentUow}] - $message")
+ def extras = Map("uow" -> currentUow)
+
+ protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, RichLogEvent(message, extras)))
+ protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, RichLogEvent(message, extras)))
+ protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, RichLogEvent(message, extras)))
+ protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, RichLogEvent(message, extras)))
+ protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, RichLogEvent(message, extras)))
+}
+
+case class RichLogEvent(message: String, extras: Map[String, Any])
+
+
+
+class ExtendedSlf4jLogger extends Actor with SLF4JLogging {
+
+ val mdcThreadAttributeName = "sourceThread"
+ val mdcAkkaSourceAttributeName = "akkaSource"
+ val mdcAkkaTimestamp = "akkaTimestamp"
+
+ def receive = {
+
+ case event @ Error(cause, logSource, logClass, message) ⇒
+ withMdc(logSource, event) {
+ cause match {
+ case Error.NoCause | null ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else null) }
+ case cause ⇒ withRichEventProcessing(message) { Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) }
+ }
+ }
+
+ case event @ Warning(logSource, logClass, message) ⇒
+ withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } }
+
+ case event @ Info(logSource, logClass, message) ⇒
+ withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } }
+
+ case event @ Debug(logSource, logClass, message) ⇒
+ withMdc(logSource, event) { withRichEventProcessing(message) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } }
+
+ case InitializeLogger(_) ⇒
+ log.info("Slf4jLogger started")
+ sender ! LoggerInitialized
+ }
+
+ def withRichEventProcessing(message: Any)(delegate: => Unit): Unit = message match {
+ case RichLogEvent(event, extras) => {
+ extras.foreach { case (k, v) => MDC.put(k, v.toString) }
+ delegate
+ MDC.clear()
+ }
+ case _ => delegate
+ }
+
+ @inline
+ final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: ⇒ Unit) {
+ MDC.put(mdcAkkaSourceAttributeName, logSource)
+ MDC.put(mdcThreadAttributeName, logEvent.thread.getName)
+ MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp))
+ try logStatement finally {
+ MDC.remove(mdcAkkaSourceAttributeName)
+ MDC.remove(mdcThreadAttributeName)
+ MDC.remove(mdcAkkaTimestamp)
+ }
+ }
+
+ /**
+ * Override this method to provide a differently formatted timestamp
+ * @param timestamp a "currentTimeMillis"-obtained timestamp
+ * @return the given timestamp as a UTC String
+ */
+ protected def formatTimestamp(timestamp: Long): String =
+ Helpers.currentTimeMillisToUTCString(timestamp)
}
diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
index 58473f13..6d38f3d7 100644
--- a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
+++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala
@@ -9,7 +9,9 @@ import kamon.{Tracer, Kamon}
trait UowDirectives extends BasicDirectives {
def uow: Directive0 = mapRequest { request =>
- val generatedUow = Some(UowDirectives.newUow)
+ val uowHeader = request.headers.find(_.name == "X-UOW")
+
+ val generatedUow = uowHeader.map(_.value).orElse(Some(UowDirectives.newUow))
println("Generated UOW: "+generatedUow)
Tracer.set(Tracer.newTraceContext().copy(userContext = generatedUow))
diff --git a/project/Build.scala b/project/Build.scala
index 879cc1c0..0141540b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -19,7 +19,7 @@ object Build extends Build {
.settings(
libraryDependencies ++=
- compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++
+ compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, aspectJWeaver, metrics, sprayJson) ++
test(scalatest, akkaTestKit, sprayTestkit))
//.dependsOn(kamonDashboard)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 028004e1..b61f0979 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -24,6 +24,7 @@ object Dependencies {
val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M6-SNAP22"
val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
+ val aspectJWeaver = "org.aspectj" % "aspectjweaver" % "1.7.2"
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0"
val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0"