diff options
Diffstat (limited to 'kamon-core')
30 files changed, 646 insertions, 635 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 0f427611..f13effb9 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -7,24 +7,26 @@ <aspects> - <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/> + <aspect name="kamon.instrumentation.EnvelopeTracingContext"/> <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/> - <aspect name="kamon.instrumentation.UnregisteredActorRefInstrumentation"/> <aspect name="kamon.instrumentation.RunnableInstrumentation" /> - <aspect name="kamon.instrumentation.MessageQueueInstrumentation" /> - - <aspect name="kamon.instrumentation.InceptionAspect"/> + <aspect name="kamon.instrumentation.SprayRequestContextTracing"/> + <aspect name="kamon.instrumentation.SprayOpenRequestContextTracing"/> + <aspect name = "kamon.instrumentation.SprayServerInstrumentation"/> + <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> + <aspect name="kamon.instrumentation.ActorLoggingInstrumentation"/> + <!--<aspect name="kamon.instrumentation.MessageQueueInstrumentation" />--> + <!--<aspect name="kamon.instrumentation.InceptionAspect"/>--> <!-- ExecutorService Instrumentation for Akka. --> <!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>--> - <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> <!--<aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/>--> - <include within="*"/> <exclude within="javax..*"/> + <exclude within="com.newrelic..*"/> <exclude within="org.aspectj..*"/> <exclude within="scala..*"/> <exclude within="scalaz..*"/> diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf deleted file mode 100644 index b4c8b7f6..00000000 --- a/kamon-core/src/main/resources/application.conf +++ /dev/null @@ -1,37 +0,0 @@ -akka { - loglevel = DEBUG - stdout-loglevel = DEBUG - - #extensions = ["kamon.dashboard.DashboardExtension"] - event-handlers = ["kamon.newrelic.NewRelicErrorLogger"] - - actor { - default-dispatcher { - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 3.0 - - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 8 - } - - throughput = 100 - } - debug { - unhandled = on - } - } -} - - - - - - - - diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-core/src/main/resources/logback.xml new file mode 100644 index 00000000..2ae1e3bd --- /dev/null +++ b/kamon-core/src/main/resources/logback.xml @@ -0,0 +1,12 @@ +<configuration scan="true"> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n</pattern> + </encoder> + </appender> + + <root level="debug"> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf new file mode 100644 index 00000000..29532595 --- /dev/null +++ b/kamon-core/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +akka { + loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] +} + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 298f43eb..fb1b2393 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -5,6 +5,13 @@ import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} import scala.concurrent.duration.FiniteDuration import com.newrelic.api.agent.NewRelic import scala.collection.concurrent.TrieMap +import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} +import scala.util.DynamicVariable + + +object Instrument { + val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation +} object Kamon { implicit lazy val actorSystem = ActorSystem("kamon") @@ -19,27 +26,20 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") - val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } object Tracer { - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } + val traceContext = new DynamicVariable[Option[TraceContext]](None) - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } + def context() = traceContext.value + def set(ctx: TraceContext) = traceContext.value = Some(ctx) + def start = set(newTraceContext) def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } @@ -79,7 +79,7 @@ case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThre class NewrelicReporterActor extends Actor { import scala.concurrent.duration._ - Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) def receive = { case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 62d7f57e..63cdb488 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -2,22 +2,26 @@ package kamon import java.util.UUID import akka.actor._ -import akka.agent.Agent -import java.util.concurrent.TimeUnit -import scala.util.{Failure, Success} -import akka.util.Timeout +import java.util.concurrent.atomic.AtomicLong +import kamon.trace.UowTraceAggregator +import scala.concurrent.duration._ +import kamon.newrelic.NewRelicReporting +import kamon.trace.UowTracing.Start +// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. +case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) -case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { - implicit val timeout = Timeout(30, TimeUnit.SECONDS) - implicit val as = Kamon.actorSystem.dispatcher +object TraceContext { + val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) + val traceIdCounter = new AtomicLong - def append(entry: TraceEntry) = entries ! entry - def close = entries ! "Close" // TODO type this thing!. -} + def apply()(implicit system: ActorSystem) = { + val n = traceIdCounter.incrementAndGet() + val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}") + actor ! Start() -object TraceContext { - def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + } } @@ -30,20 +34,10 @@ class TraceAccumulator extends Actor { trait TraceEntry - case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry - - - case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - - - - -object Collector { - -} +object Collector trait TraceEntryStorage { def store(entry: TraceEntry): Boolean diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala deleted file mode 100644 index 24661445..00000000 --- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala +++ /dev/null @@ -1,26 +0,0 @@ -package kamon - -/** - * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. - */ -trait TraceContextSwap { - - def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) - - def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { - ctx match { - case Some(context) => { - Tracer.set(context) - val bodyResult = primary - Tracer.clear - - bodyResult - } - case None => fallback - } - - } - -} - -object TraceContextSwap extends TraceContextSwap diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala deleted file mode 100644 index 0626b91d..00000000 --- a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala +++ /dev/null @@ -1,15 +0,0 @@ -package kamon - -import akka.actor.Actor -import java.util.UUID - -class TransactionPublisher extends Actor { - - def receive = { - case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") - } - -} - - -case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala deleted file mode 100644 index a1c099d4..00000000 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ /dev/null @@ -1,103 +0,0 @@ -package kamon.executor - -import akka.event.ActorEventBus -import akka.event.LookupClassification -import akka.actor._ -import java.util.concurrent.TimeUnit - -import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} -import akka.util.Timeout -import scala.util.{Random, Success, Failure} -import scala.concurrent.Future - -trait Message - -case class PostMessage(text:String) extends Message - -case class MessageEvent(val channel:String, val message:Message) - -class AppActorEventBus extends ActorEventBus with LookupClassification{ - type Event = MessageEvent - type Classifier=String - protected def mapSize(): Int={ - 10 - } - - protected def classify(event: Event): Classifier={ - event.channel - } - - protected def publish(event: Event, subscriber: Subscriber): Unit={ - subscriber ! event - } -} -case class Ping() -case class Pong() - -class PingActor extends Actor with ActorLogging { - - val pong = context.actorOf(Props[PongActor], "Pong") - val random = new Random() - def receive = { - case Pong() => { - //Thread.sleep(random.nextInt(2000)) - //log.info("Message from Ping") - pong ! Ping() - } - } -} - -class PongActor extends Actor with ActorLogging { - def receive = { - case Ping() => { - sender ! Pong() - } - } -} - - -object TryAkka extends App{ - val system = ActorSystem("MySystem") - val appActorEventBus=new AppActorEventBus - val NEW_POST_CHANNEL="/posts/new" - val subscriber = system.actorOf(Props(new Actor { - def receive = { - case d: MessageEvent => println(d) - } - })) - - Tracer.start - for(i <- 1 to 4) { - val ping = system.actorOf(Props[PingActor], "Ping" + i) - ping ! Pong() - } - - - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") - - /* - val newRelicReporter = new NewRelicReporter(registry) - newRelicReporter.start(1, TimeUnit.SECONDS) - -*/ - import akka.pattern.ask - implicit val timeout = Timeout(10, TimeUnit.SECONDS) - implicit def execContext = system.dispatcher - - - - //Tracer.start - - Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) - threadPrintln("Before doing it") - val f = Future { threadPrintln("This is happening inside the future body") } - - Tracer.stop - - - //Thread.sleep(3000) - //system.shutdown() - -/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) - appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala new file mode 100644 index 00000000..4e47c2a4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,41 @@ +package kamon.instrumentation + +import akka.actor.{Props, ActorSystem, ActorRef} +import akka.dispatch.{MessageDispatcher, Envelope} +import kamon.{Tracer, TraceContext} +import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage + +trait ActorInstrumentationConfiguration { + def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any + def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation +} + + +trait ActorReceiveInvokeInstrumentation { + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) +} + +object ActorReceiveInvokeInstrumentation { + val noopPreReceive = new ActorReceiveInvokeInstrumentation{ + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None) + } +} + +class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration { + def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context) + + def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = { + new ActorReceiveInvokeInstrumentation { + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match { + case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx) + case anyOther => (anyOther, None) + } + } + } +} + +object SimpleContextPassingInstrumentation { + case class SimpleTraceMessage(message: Any, context: Option[TraceContext]) +} + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..47d1756f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala @@ -0,0 +1,31 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import kamon.{Tracer, TraceContext} +import org.aspectj.lang.ProceedingJoinPoint +import org.slf4j.MDC + +@Aspect +class ActorLoggingInstrumentation { + + + @DeclareMixin("akka.event.Logging.LogEvent+") + def traceContextMixin: ContextAware = new ContextAware { + def traceContext: Option[TraceContext] = Tracer.context() + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { + logEvent.traceContext match { + case Some(ctx) => + MDC.put("uow", ctx.uow) + pjp.proceed() + MDC.remove("uow") + + case None => pjp.proceed() + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 7d3e36ca..7b5d5339 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -4,116 +4,46 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Tracer, TraceContext} -import akka.dispatch.{MessageDispatcher, Envelope} +import akka.dispatch.{Envelope, MessageDispatcher} import com.codahale.metrics.Timer -import kamon.metric.{MetricDirectory, Metrics} import scala.Some +import kamon.trace.context.TracingAwareContext case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) - +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext @Aspect -class ActorRefTellInstrumentation { - import ProceedingJoinPointPimp._ - - val t2 = Metrics.registry.timer("some" + "LATENCY") - - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") - def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(actor, message, sender)") - def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { - - //val actorName = MetricDirectory.nameForActor(actor) - //val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) - - } -} - - -@Aspect("perthis(actorCellCreation(..))") class ActorCellInvokeInstrumentation { - var processingTimeTimer: Timer = _ - var shouldTrack = false - - // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - @After("actorCellCreation(system, ref, props, dispatcher, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - //println("=====> Created ActorCell for: "+ref.toString()) - /** TODO: Find a better way to filter the things we don't want to measure. */ - //if(system.name != "kamon" && actorName.startsWith("/user")) { - processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") - shouldTrack = true - //} - } - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import ProceedingJoinPointPimp._ - //println("ENVELOPE --------------------->"+envelope) - envelope match { - case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - //timer.stop() - - val originalEnvelope = envelope.copy(message = msg) + //safe cast + val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() - ctx match { - case Some(c) => { - Tracer.set(c) - //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) - pjp.proceedWith(originalEnvelope) - Tracer.clear - } - case None => pjp.proceedWith(originalEnvelope) - } - pt.stop() - } - case _ => pjp.proceed + Tracer.traceContext.withValue(msgContext) { + pjp.proceed() } } } - @Aspect -class UnregisteredActorRefInstrumentation { - @Pointcut("execution(* akka.spray.UnregisteredActorRefBase+.handle(..)) && args(message, sender)") - def sprayResponderHandle(message: Any, sender: ActorRef) = {} +class EnvelopeTracingContext { - @Around("sprayResponderHandle(message, sender)") - def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import ProceedingJoinPointPimp._ - println("Handling unregistered actor ref message: "+message) - message match { - case TraceableMessage(ctx, msg, timer) => { - timer.stop() + @DeclareMixin("akka.dispatch.Envelope") + def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() - ctx match { - case Some(c) => { - Tracer.set(c) - pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. - Tracer.clear - } - case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) - } - } - case _ => pjp.proceed - } + @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") + def requestRecordInit(ctx: TracingAwareContext): Unit = {} + + @After("requestRecordInit(ctx)") + def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { + // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. + ctx.traceContext } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index 30041321..992cfa82 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -8,10 +8,12 @@ import scala.Some /** * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. */ -trait TraceContextAwareRunnable extends Runnable {} +trait TraceContextAwareRunnable { + def traceContext: Option[TraceContext] +} -@Aspect("perthis(instrumentedRunnableCreation())") +@Aspect class RunnableInstrumentation { /** @@ -19,43 +21,38 @@ class RunnableInstrumentation { * while their run method is executed. */ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } /** * Pointcuts */ - @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))") - def instrumentedRunnableCreation(): Unit = {} - - @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())") - def runnableExecution() = {} - - - /** - * Aspect members - */ + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)") + def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {} - private val traceContext = Tracer.context + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)") + def runnableExecution(runnable: TraceContextAwareRunnable) = {} - /** - * Advices - */ - import kamon.TraceContextSwap.withContext - @Before("instrumentedRunnableCreation()") - def beforeCreation = { - //println((new Throwable).getStackTraceString) + @After("instrumentedRunnableCreation(runnable)") + def beforeCreation(runnable: TraceContextAwareRunnable): Unit = { + // Force traceContext initialization. + runnable.traceContext } - @Around("runnableExecution()") - def around(pjp: ProceedingJoinPoint) = { + @Around("runnableExecution(runnable)") + def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { import pjp._ - withContext(traceContext, proceed()) + Tracer.traceContext.withValue(runnable.traceContext) { + proceed() + } } } + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala deleted file mode 100644 index 74261403..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala +++ /dev/null @@ -1,49 +0,0 @@ -package kamon.instrumentation - -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} - -class ActorCage(val name: String, val size: Int) { - - def doIt: Unit = println("name") -} - -trait CageMonitoring { - def histogram: Histogram - def count(value: Int): Unit -} - -class CageMonitoringImp extends CageMonitoring{ - final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) - - def count(value: Int) = histogram.update(value) - -} - - -@Aspect -class InceptionAspect { - - @DeclareMixin("kamon.instrumentation.ActorCage") - def mixin: CageMonitoring = new CageMonitoringImp - - - @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") - def theActorCageDidIt(actorCage: CageMonitoring) = {} - - @After("theActorCageDidIt(actorCage)") - def afterDoingIt(actorCage: CageMonitoring) = { - actorCage.count(1) - actorCage.histogram.getSnapshot.dump(System.out) - } - - - -} - - -object Runner extends App { - val cage = new ActorCage("ivan", 10) - - cage.doIt -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala new file mode 100644 index 00000000..2239f382 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -0,0 +1,115 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.{TraceContext, Tracer} +import kamon.trace.UowTracing._ +import kamon.trace.context.TracingAwareContext +import org.aspectj.lang.ProceedingJoinPoint +import spray.http.HttpRequest +import kamon.trace.UowTracing.Finish +import kamon.trace.UowTracing.Rename +import spray.http.HttpHeaders.Host + +//import spray.can.client.HttpHostConnector.RequestContext + +trait ContextAware { + def traceContext: Option[TraceContext] +} + +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} + +@Aspect +class SprayOpenRequestContextTracing { + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { + val timestamp: Long = System.nanoTime() + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } +} + +@Aspect +class SprayServerInstrumentation { + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} + + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { + Tracer.start + openRequest.traceContext + + Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) + } + + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") + def openRequestCreation(openRequest: ContextAware): Unit = {} + + @After("openRequestCreation(openRequest)") + def afterFinishingRequest(openRequest: ContextAware): Unit = { + val original = openRequest.traceContext + Tracer.context().map(_.tracer ! Finish()) + + if(Tracer.context() != original) { + println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") + } + } + + @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") + def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} + + @After("requestRecordInit(ctx, request)") + def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { + // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. + for{ + tctx <- ctx.traceContext + host <- request.header[Host] + } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) + } + + + + @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") + def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {} + + @Around("dispatchToCommander(requestContext, message)") + def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { + println("Completing the request with context: " + requestContext.traceContext) + + Tracer.traceContext.withValue(requestContext.traceContext) { + requestContext.traceContext.map { + tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) + } + pjp.proceed() + } + + } + + + @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") + def copyingRequestContext(old: TimedContextAware): Unit = {} + + @Around("copyingRequestContext(old)") + def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { + println("Instrumenting the request context copy.") + Tracer.traceContext.withValue(old.traceContext) { + pjp.proceed() + } + } +} + +case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext + +@Aspect +class SprayRequestContextTracing { + + @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") + def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala deleted file mode 100644 index fb117968..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala +++ /dev/null @@ -1,6 +0,0 @@ -package kamon.metric - -object MetricFilter { - def actorSystem(system: String): Boolean = !system.startsWith("kamon") - def actor(path: String, system: String): Boolean = true -} diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala deleted file mode 100644 index 5b4ceaf4..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics._ - -object MetricsUtils { - - def markMeter[T](meter:Meter)(f: => T): T = { - meter.mark() - f - } -// -// def incrementCounter(key: String) { -// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count -// } -// -// def markMeter(key: String) { -// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() -// } -// -// def trace[T](key: String)(f: => T): T = { -// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) -// timer.time(f) -// } - -// def markAndCountMeter[T](key: String)(f: => T): T = { -// markMeter(key) -// f -// } -// -// def traceAndCount[T](key: String)(f: => T): T = { -// incrementCounter(key) -// trace(key) { -// f -// } - //} - -// implicit def runnable(f: () => Unit): Runnable = -// new Runnable() { def run() = f() } -// -// -// import java.util.concurrent.Callable -// -// implicit def callable[T](f: () => T): Callable[T] = -// new Callable[T]() { def call() = f() } - -// private val actorCounter:Counter = new Counter -// private val actorTimer:Timer = new Timer -// -// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) -// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala new file mode 100644 index 00000000..106f27e2 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -0,0 +1,52 @@ +package kamon.newrelic + +import akka.actor.Actor +import kamon.trace.UowTrace +import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} +import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} +import java.util +import java.util.Date + + +class NewRelicReporting extends Actor { + def receive = { + case trace: UowTrace => recordTransaction(trace) + } + + def recordTransaction(uowTrace: UowTrace): Unit = { + val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) + + NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) + NewRelic.recordMetric("WebTransaction", time.toFloat) + NewRelic.recordMetric("HttpDispatcher", time.toFloat) + + uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => + val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat + + println("Web External: " + webExternalTrace) + NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) + NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) + NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) + } + + + val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) + + + def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { + case Nil => accum + case head :: tail => + if(head.start > lastEnd) + measureExternal(accum + (head.finish-head.start), head.finish, tail) + else + measureExternal(accum + (head.finish-lastEnd), head.finish, tail) + } + + val external = measureExternal(0, 0, allExternals) / 1E9 + + + NewRelic.recordMetric(s"External/all", external.toFloat) + NewRelic.recordMetric(s"External/allWeb", external.toFloat) + + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala new file mode 100644 index 00000000..392f53b8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala @@ -0,0 +1,28 @@ +package kamon.trace + +import spray.routing.directives.BasicDirectives +import spray.routing._ +import kamon.Tracer +import java.util.concurrent.atomic.AtomicLong +import scala.util.Try +import java.net.InetAddress + +trait UowDirectives extends BasicDirectives { + def uow: Directive0 = mapRequest { request => + val uowHeader = request.headers.find(_.name == "X-UOW") + + val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) + Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) + + request + } +} + +object UowDirectives { + val uowCounter = new AtomicLong + + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + + def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..b09478cc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,57 @@ +package kamon.trace + +import akka.actor._ +import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ + +sealed trait UowSegment { + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start() extends AutoTimestamp + case class Finish() extends AutoTimestamp + case class Rename(name: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(start: Long, finish: Long, host: String) extends AutoTimestamp +} + +case class UowTrace(name: String, segments: Seq[UowSegment]) + + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { + context.setReceiveTimeout(aggregationTimeout) + + var name: Option[String] = None + var segments: Seq[UowSegment] = Nil + + var pendingExternal = List[WebExternalStart]() + + def receive = { + case finish: Finish => segments = segments :+ finish; finishTracing() + case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { + segments = segments :+ WebExternal(start.timestamp, finish.timestamp, start.host) + }) + case Rename(newName) => name = Some(newName) + case segment: UowSegment => segments = segments :+ segment + case ReceiveTimeout => + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + println("Recorded Segments: " + segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala new file mode 100644 index 00000000..3766dd22 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala @@ -0,0 +1,8 @@ +package kamon.trace.context + +import kamon.TraceContext + +trait TracingAwareContext { + def traceContext: Option[TraceContext] + def timestamp: Long +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala deleted file mode 100644 index 07532d0a..00000000 --- a/kamon-core/src/main/scala/spraytest/ClientTest.scala +++ /dev/null @@ -1,55 +0,0 @@ -package spraytest - -import akka.actor.ActorSystem -import spray.client.pipelining._ -import spray.httpx.SprayJsonSupport -import spray.json._ -import scala.concurrent.Future -import spray.can.Http -import akka.io.IO - -/** - * BEGIN JSON Infrastructure - */ -case class Container(data: List[PointOfInterest]) -case class Geolocation(latitude: Float, longitude: Float) -case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) - -object GeoJsonProtocol extends DefaultJsonProtocol { - implicit val geolocationFormat = jsonFormat2(Geolocation) - implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) - implicit val containerFormat = jsonFormat1(Container) -} -/** END-OF JSON Infrastructure */ - - - - - - -class ClientTest extends App { - implicit val actorSystem = ActorSystem("spray-client-test") - import actorSystem.dispatcher - - - import GeoJsonProtocol._ - import SprayJsonSupport._ - - - val actor = IO(Http) - - val pipeline = sendReceive ~> unmarshal[Container] - - val response = pipeline { - Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") - } onSuccess { - case a => { - println(a) - } - } -} - - - - - diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala deleted file mode 100644 index b864d6d6..00000000 --- a/kamon-core/src/main/scala/spraytest/FutureTesting.scala +++ /dev/null @@ -1,81 +0,0 @@ -package spraytest -/* -import akka.actor.ActorSystem -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Try, Success} -import kamon.actor.TransactionContext - -object FutureTesting extends App { - - val actorSystem = ActorSystem("future-testing") - implicit val ec = actorSystem.dispatcher - implicit val tctx = TransactionContext(11, Nil) - - threadPrintln("In the initial Thread") - - - val f = TraceableFuture { - threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") - } - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") - }) - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") - }) - - - - - - - - - def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") - -} - - - - -trait TransactionContextWrapper { - def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { - TransactionContext.current.set(tranContext.fork) - println(s"SetContext to: ${tranContext}") - val result = f - - TransactionContext.current.remove() - result - } - -} - -class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { - def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { - future.onComplete(wrap(func, transactionContext)) - } -} - -object TraceableFuture { - - implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future - - def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { - val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) - - new TraceableFuture(Future { wrappedBody }) - } - - - - - def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { - TransactionContext.current.set(transactionContext) - val result = body - TransactionContext.current.remove() - result - } -}*/ - diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala deleted file mode 100644 index f9d6869c..00000000 --- a/kamon-core/src/main/scala/test/PingPong.scala +++ /dev/null @@ -1,34 +0,0 @@ -package test - -import akka.actor.{Props, Actor, ActorSystem} - -object PingPong extends App { - - val as = ActorSystem("ping-pong") - - val pinger = as.actorOf(Props[Pinger]) - val ponger = as.actorOf(Props[Ponger]) - - pinger.tell(Pong, ponger) - - - Thread.sleep(30000) - as.shutdown() - - -} - -case object Ping -case object Pong - -class Pinger extends Actor { - def receive = { - case Pong => sender ! Ping - } -} - -class Ponger extends Actor { - def receive = { - case Ping => sender ! Pong - } -} diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala new file mode 100644 index 00000000..b1727d2b --- /dev/null +++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala @@ -0,0 +1,97 @@ +package test + +import akka.actor._ +import kamon.Tracer +import spray.routing.SimpleRoutingApp +import akka.util.Timeout +import spray.httpx.RequestBuilding +import scala.concurrent.{Await, Future} +import kamon.trace.UowDirectives + +object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { + import scala.concurrent.duration._ + import spray.client.pipelining._ + import akka.pattern.ask + + implicit val system = ActorSystem("test") + import system.dispatcher + + implicit val timeout = Timeout(30 seconds) + + val pipeline = sendReceive + val replier = system.actorOf(Props[Replier]) + + startServer(interface = "localhost", port = 9090) { + get { + path("test"){ + uow { + complete { + val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil + + Future.sequence(futures).map(l => "Ok") + } + } + } ~ + path("reply" / Segment) { reqID => + uow { + complete { + if (Tracer.context().isEmpty) + println("ROUTE NO CONTEXT") + + (replier ? reqID).mapTo[String] + } + } + } ~ + path("ok") { + complete("ok") + } ~ + path("future") { + dynamic { + complete(Future { "OK" }) + } + } ~ + path("error") { + complete { + throw new NullPointerException + "okk" + } + } + } + } + +} + +object Verifier extends App { + + def go: Unit = { + import scala.concurrent.duration._ + import spray.client.pipelining._ + + implicit val system = ActorSystem("test") + import system.dispatcher + + implicit val timeout = Timeout(30 seconds) + + val pipeline = sendReceive + + val futures = Future.sequence(for(i <- 1 to 500) yield { + pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) + }) + println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) + } + + + + +} + +class Replier extends Actor with ActorLogging { + def receive = { + case anything => + if(Tracer.context.isEmpty) + log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") + + log.info("Processing at the Replier") + sender ! anything + } +} diff --git a/kamon-core/src/main/resources/newrelic.yml b/kamon-core/src/test/resources/newrelic.yml index 1b1ad53b..77923e9c 100644 --- a/kamon-core/src/main/resources/newrelic.yml +++ b/kamon-core/src/test/resources/newrelic.yml @@ -54,7 +54,7 @@ common: &default_settings # Log all data to and from New Relic in plain text. # This setting is dynamic, so changes do not require restarting your application. # Default is false. - #audit_mode: true + audit_mode: true # The number of log files to use. # Default is 1. diff --git a/kamon-core/src/test/scala/ExtraSpec.scala b/kamon-core/src/test/scala/ExtraSpec.scala new file mode 100644 index 00000000..b8dc053d --- /dev/null +++ b/kamon-core/src/test/scala/ExtraSpec.scala @@ -0,0 +1,34 @@ +import akka.actor.ActorSystem +import akka.testkit.TestKit +import org.scalatest.WordSpecLike +import shapeless._ + +class ExtraSpec extends TestKit(ActorSystem("ExtraSpec")) with WordSpecLike { + + "the Extra pattern helper" should { + "be constructed from a finite number of types" in { + Extra.expecting[String :: Int :: HNil].as[Person] + } + } + + case class Person(name: String, age: Int) +} + +/** + * Desired Features: + * 1. Expect messages of different types, apply a function and forward to some other. + */ + +object Extra { + def expecting[T <: HList] = new Object { + def as[U <: Product] = ??? + } +} + +/* +extra of { + expect[A] in { actor ! msg} + expect[A] in { actor ! msg} +} as (A, A) pipeTo (z)*/ + + diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index ccc7740b..21be4a73 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -8,6 +8,7 @@ import kamon.{TraceContext, Tracer} import akka.pattern.{pipe, ask} import akka.util.Timeout import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import akka.routing.RoundRobinRouter @@ -35,11 +36,30 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation expectMsg(Some(testTraceContext)) } - "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture { + "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) expectMsgAllOf(contexts: _*) } + + /*"propagate with many asks" in { + val echo = system.actorOf(Props[TraceContextEcho]) + val iterations = 50000 + implicit val timeout = Timeout(10 seconds) + + val futures = for(_ <- 1 to iterations) yield { + Tracer.start + val result = (echo ? "test") + Tracer.clear + + result + } + + val allResults = Await.result(Future.sequence(futures), 10 seconds) + assert(iterations == allResults.collect { + case Some(_) => 1 + }.sum) + }*/ } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index fe89695b..6010a185 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -16,46 +16,48 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur "a instrumented runnable" when { "created in a thread that does have a TraceContext" must { "preserve the TraceContext" which { - "should be available during the run method execution" in { new FutureWithContextFixture { + "should be available during the run method execution" in new FutureWithContextFixture { whenReady(futureWithContext) { result => result.value should equal(testContext) } - }} + } + + "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { - "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture { - val onCompleteContext = Promise[TraceContext]() + val onCompleteContext = Promise[Option[TraceContext]]() + Tracer.traceContext.withValue(Some(testContext)) { futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(Tracer.context.get)) + case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) }) + } - whenReady(onCompleteContext.future) { result => - result should equal(testContext) - } - }} + whenReady(onCompleteContext.future) { result => + result should equal(Some(testContext)) + } + } } } "created in a thread that doest have a TraceContext" must { - "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ - + "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{ whenReady(futureWithoutContext) { result => result should equal(None) } - }} + } - "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture { + "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture { val onCompleteContext = Promise[Option[TraceContext]]() - futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(Tracer.context)) - }) + futureWithoutContext.onComplete { + case _ => onCompleteContext.complete(Success(Tracer.traceContext.value)) + } whenReady(onCompleteContext.future) { result => result should equal(None) } - }} + } } } @@ -68,14 +70,15 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur class FutureWithContextFixture { val testContext = TraceContext() - Tracer.set(testContext) - val futureWithContext = Future { Tracer.context} + var futureWithContext: Future[Option[TraceContext]] = _ + Tracer.traceContext.withValue(Some(testContext)) { + futureWithContext = Future { Tracer.traceContext.value } + } } trait FutureWithoutContextFixture { - Tracer.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { Tracer.context } + val futureWithoutContext = Future { Tracer.traceContext.value } } } diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala new file mode 100644 index 00000000..60b5f06d --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala @@ -0,0 +1,36 @@ +package kamon.trace + +import org.scalatest.{WordSpecLike, WordSpec} +import akka.testkit.{TestKitBase, TestKit} +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{Finish, Rename, Start} + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start() + aggregator ! Finish() + + expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish()))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start() + aggregator ! Rename("test-uow") + aggregator ! Finish() + + expectMsg(UowTrace("test-uow", Seq(Start(), Finish()))) + } + } + } + + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} |