diff options
34 files changed, 253 insertions, 427 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml index c63e17e5..207bf1b9 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml @@ -7,14 +7,14 @@ <aspects> - <aspect name="kamon.instrumentation.EnvelopeTracingContext"/> - <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/> + <aspect name="kamon.trace.instrumentation.EnvelopeTracingContext"/> + <aspect name="kamon.trace.instrumentation.ActorCellInvokeInstrumentation"/> <aspect name="kamon.trace.instrumentation.RunnableTracing" /> <aspect name="kamon.instrumentation.SprayRequestContextTracing"/> <aspect name="kamon.instrumentation.SprayOpenRequestContextTracing"/> <aspect name = "kamon.instrumentation.SprayServerInstrumentation"/> <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> - <aspect name="kamon.instrumentation.ActorLoggingInstrumentation"/> + <aspect name="kamon.trace.instrumentation.ActorLoggingInstrumentation"/> <!--<aspect name="kamon.instrumentation.MessageQueueInstrumentation" />--> <!--<aspect name="kamon.instrumentation.InceptionAspect"/>--> diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala new file mode 100644 index 00000000..3d503d54 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala @@ -0,0 +1,14 @@ +package kamon + +import akka.actor.{ActorSystem, ExtensionId} +import java.util.concurrent.ConcurrentHashMap + +object AkkaExtensionSwap { + def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = { + val extensionsField = system.getClass.getDeclaredField("extensions") + extensionsField.setAccessible(true) + + val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] + extensions.put(key, value) + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 8c934f60..5d0d77a3 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,16 +1,6 @@ package kamon import akka.actor._ -import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} -import scala.concurrent.duration.FiniteDuration -import scala.collection.concurrent.TrieMap -import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} -import kamon.metric.ActorSystemMetrics - - -object Instrument { - val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation -} object Kamon { trait Extension extends akka.actor.Extension { @@ -18,8 +8,5 @@ object Kamon { } def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager - - - implicit lazy val actorSystem = ActorSystem("kamon") } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala deleted file mode 100644 index 4e078201..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala +++ /dev/null @@ -1,42 +0,0 @@ -package kamon.instrumentation - -import akka.actor.{Props, ActorSystem, ActorRef} -import akka.dispatch.{MessageDispatcher, Envelope} -import kamon.{Tracer} -import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage -import kamon.trace.TraceContext - -trait ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation -} - - -trait ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) -} - -object ActorReceiveInvokeInstrumentation { - val noopPreReceive = new ActorReceiveInvokeInstrumentation{ - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None) - } -} - -class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context) - - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = { - new ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match { - case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx) - case anyOther => (anyOther, None) - } - } - } -} - -object SimpleContextPassingInstrumentation { - case class SimpleTraceMessage(message: Any, context: Option[TraceContext]) -} - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index b4f8a475..3a091775 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -21,7 +21,7 @@ class ActorSystemInstrumentation { @After("actorSystemInstantiation(name, applicationConfig, classLoader)") def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { - Kamon.Metric.registerActorSystem(name) + //Kamon.Metric.registerActorSystem(name) } } @@ -35,7 +35,7 @@ class ForkJoinPoolInstrumentation { @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { - val (actorSystemName, dispatcherName) = threadFactory match { + /*val (actorSystemName, dispatcherName) = threadFactory match { case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) case _ => ("Unknown", "Unknown") } @@ -45,7 +45,7 @@ class ForkJoinPoolInstrumentation { activeThreadsHistogram = m.activeThreadCount poolSizeHistogram = m.poolSize println(s"Registered $dispatcherName for actor system $actorSystemName") - } + }*/ } def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala deleted file mode 100644 index 70f3e54a..00000000 --- a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics -import metrics._ -import java.util.concurrent.TimeUnit -import java.util -import com.newrelic.api.agent.NewRelic -import scala.collection.JavaConverters._ - - -class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { - - - - private[NewRelicReporter] def processMeter(name: String, meter: Meter) { - NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) - } - - private[NewRelicReporter] def processCounter(name:String, counter:Counter) { - println(s"Logging to NewRelic: ${counter.getCount}") - - } - - -/* def processGauge(name: String, gauge: Gauge[_]) = { - println(s"the value is: "+gauge.getValue) - NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) - }*/ - - - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { - //Process Meters - meters.asScala.map{case(name, meter) => processMeter(name, meter)} - - //Process Meters - counters.asScala.map{case(name, counter) => processCounter(name, counter)} - - // Gauges - gauges.asScala.foreach{ case (name, gauge) => { - val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() - val fullMetricName = "Custom" + name - NewRelic.recordMetric(fullMetricName, measure) - }} - } - - -} - -object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala deleted file mode 100644 index c8d0d4f0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala +++ /dev/null @@ -1,8 +0,0 @@ -package kamon.trace.context - -import kamon.trace.TraceContext - -trait TracingAwareContext { - def traceContext: Option[TraceContext] - def timestamp: Long -}
\ No newline at end of file diff --git a/kamon-core/src/test/scala/ExtraSpec.scala b/kamon-core/src/test/scala/ExtraSpec.scala deleted file mode 100644 index b8dc053d..00000000 --- a/kamon-core/src/test/scala/ExtraSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -import akka.actor.ActorSystem -import akka.testkit.TestKit -import org.scalatest.WordSpecLike -import shapeless._ - -class ExtraSpec extends TestKit(ActorSystem("ExtraSpec")) with WordSpecLike { - - "the Extra pattern helper" should { - "be constructed from a finite number of types" in { - Extra.expecting[String :: Int :: HNil].as[Person] - } - } - - case class Person(name: String, age: Int) -} - -/** - * Desired Features: - * 1. Expect messages of different types, apply a function and forward to some other. - */ - -object Extra { - def expecting[T <: HList] = new Object { - def as[U <: Product] = ??? - } -} - -/* -extra of { - expect[A] in { actor ! msg} - expect[A] in { actor ! msg} -} as (A, A) pipeTo (z)*/ - - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala deleted file mode 100644 index 1eab6355..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.ActorSystem -import kamon.Kamon - -class ActorSystemInstrumentationSpec extends WordSpec with Matchers { - - // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. - - "the actor system instrumentation" should { - "register all actor systems created" in { - val as1 = ActorSystem("as1") - val as2 = ActorSystem("as2") - - - Kamon.Metric.actorSystem("as1") should not be (None) - Kamon.Metric.actorSystem("as2") should not be (None) - Kamon.Metric.actorSystem("unknown") should be (None) - } - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala deleted file mode 100644 index 89ef61f3..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.{Actor, Props, ActorSystem} -import kamon.metric.MetricDirectory -import kamon.Kamon - -class DispatcherInstrumentationSpec extends WordSpec with Matchers{ - - - "the dispatcher instrumentation" should { - "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem { - val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers - (1 to 10).foreach(actor ! _) - - val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot - println("Active max: "+active.max) - println("Active min: "+active.min) - - } - } - - - trait SingleDispatcherActorSystem { - val actorSystem = ActorSystem("single-dispatcher") - val actor = actorSystem.actorOf(Props(new Actor { - def receive = { - case a => sender ! a; - } - })) - - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala deleted file mode 100644 index cc55ec92..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.WordSpec -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import java.util.concurrent.ConcurrentLinkedQueue -import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} -import java.util.Queue -import akka.actor.{ActorSystem, Actor} - -class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { - def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) - - - /*"A MonitoredMessageQueue" should { - "update the related histogram when a message is enqueued" in { - new PopulatedMessageQueueFixture { - - assert(histogram.getSnapshot.getMax === 0) - - for(i <- 1 to 3) { enqueueDummyMessage } - - assert(histogram.getCount === 3) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - - "update the related histogram when a message is dequeued" in { - new PopulatedMessageQueueFixture { - for(i <- 1 to 3) { enqueueDummyMessage } - assert(histogram.getSnapshot.getMax === 3) - - messageQueue.dequeue() - messageQueue.dequeue() - - assert(histogram.getCount === 5) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - } - - trait PopulatedMessageQueueFixture { - - val histogram = new Histogram(new ExponentiallyDecayingReservoir()) -/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final def queue: Queue[Envelope] = this - }*/ - val messageQueue = new MonitoredMessageQueue(delegate, histogram) - - def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) - }*/ -} diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala index 8edbebde..3af7ddca 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala @@ -26,7 +26,7 @@ trait DashboardService extends HttpService with StaticResources with DashboardPa val DashboardRoute = logRequest(showPath _) { - staticResources ~ dashboardPages ~ dashboardMetricsApi + staticResources ~ dashboardPages //~ dashboardMetricsApi } } @@ -47,8 +47,7 @@ trait DashboardPages extends HttpService { trait DashboardMetricsApi extends HttpService with SprayJsonSupport{ - import Kamon.Metric._ - import scala.collection.JavaConverters._ + /*import scala.collection.JavaConverters._ import kamon.metric.Metrics._ import kamon.dashboard.protocol.DashboardProtocols._ @@ -86,5 +85,5 @@ trait DashboardMetricsApi extends HttpService with SprayJsonSupport{ complete (ActorTree("/", ActorTree("Pang", ActorTree("Pang-children") :: Nil) :: ActorTree("Ping") :: ActorTree("Pong", ActorTree("Pong-children") :: Nil):: Nil)) } } - } + }*/ }
\ No newline at end of file diff --git a/kamon-metrics/src/main/scala/kamon/Metrics.scala b/kamon-metrics/src/main/scala/kamon/Metrics.scala index c3aedfd4..405d8b09 100644 --- a/kamon-metrics/src/main/scala/kamon/Metrics.scala +++ b/kamon-metrics/src/main/scala/kamon/Metrics.scala @@ -8,7 +8,7 @@ import akka.actor object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): Extension = new MetricsExtension(system) + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) } diff --git a/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala b/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala index 206c58e1..b090988e 100644 --- a/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala +++ b/kamon-metrics/src/test/scala/kamon/MailboxSizeMetricsSpec.scala @@ -10,7 +10,7 @@ class MailboxSizeMetricsSpec extends TestKit(ActorSystem("mailbox-size-metrics-s "register a counter for mailbox size upon actor creation" in { val target = system.actorOf(Props.empty, "sample") - Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX") + //Metrics.registry.getHistograms.get("akka://mailbox-size-metrics-spec/sample:MAILBOX") } } } diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-newrelic/src/main/resources/reference.conf index 29532595..a2583195 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-newrelic/src/main/resources/reference.conf @@ -1,4 +1,9 @@ akka { + actor { + debug { + unhandled = on + } + } loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] } diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 4bc49496..4bc49496 100644 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 106f27e2..106f27e2 100644 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-playground/src/main/resources/logback.xml index 2ae1e3bd..2ae1e3bd 100644 --- a/kamon-core/src/main/resources/logback.xml +++ b/kamon-playground/src/main/resources/logback.xml diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index da3c6c6a..7ee92580 100644 --- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -1,12 +1,12 @@ package test import akka.actor._ -import kamon.Tracer import spray.routing.SimpleRoutingApp import akka.util.Timeout import spray.httpx.RequestBuilding import scala.concurrent.{Await, Future} -import kamon.trace.UowDirectives +import kamon.spray.UowDirectives +import kamon.trace.Trace object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { import scala.concurrent.duration._ @@ -39,7 +39,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil path("reply" / Segment) { reqID => uow { complete { - if (Tracer.context().isEmpty) + if (Trace.context().isEmpty) println("ROUTE NO CONTEXT") (replier ? reqID).mapTo[String] @@ -92,7 +92,7 @@ object Verifier extends App { class Replier extends Actor with ActorLogging { def receive = { case anything => - if(Tracer.context.isEmpty) + if(Trace.context.isEmpty) log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") log.info("Processing at the Replier") diff --git a/kamon-spray/src/main/resources/META-INF/aop.xml b/kamon-spray/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..afbbb8c0 --- /dev/null +++ b/kamon-spray/src/main/resources/META-INF/aop.xml @@ -0,0 +1,10 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <weaver options="-verbose -showWeaveInfo"/> + + <aspects> + <aspect name="spray.can.server.ServerRequestTracing"/> + <include within="spray..*"/> + </aspects> +</aspectj> diff --git a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 743769e2..08cb53ff 100644 --- a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -1,61 +1,28 @@ -package kamon.instrumentation +package spray.can.server import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.HttpRequest import spray.http.HttpHeaders.Host +import kamon.trace.{TraceContext, Trace, ContextAware, TimedContextAware} //import spray.can.client.HttpHostConnector.RequestContext -trait ContextAware { - def traceContext: Option[TraceContext] -} -trait TimedContextAware { - def timestamp: Long - def traceContext: Option[TraceContext] -} @Aspect class SprayOpenRequestContextTracing { - @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") - def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { val timestamp: Long = System.nanoTime() - val traceContext: Option[TraceContext] = Tracer.traceContext.value + val traceContext: Option[TraceContext] = Trace.context() } } @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") - def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} - - @After("openRequestInit(openRequest, request)") - def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { - Tracer.start - openRequest.traceContext - - Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) - } - - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: ContextAware): Unit = {} - - @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: ContextAware): Unit = { - val original = openRequest.traceContext - Tracer.context().map(_.tracer ! Finish()) - - if(Tracer.context() != original) { - println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") - } - } @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} @@ -63,10 +30,10 @@ class SprayServerInstrumentation { @After("requestRecordInit(ctx, request)") def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - for{ + /*for{ tctx <- ctx.traceContext host <- request.header[Host] - } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) + } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)*/ } @@ -78,12 +45,12 @@ class SprayServerInstrumentation { def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { println("Completing the request with context: " + requestContext.traceContext) - Tracer.traceContext.withValue(requestContext.traceContext) { + /*Tracer.context.withValue(requestContext.traceContext) { requestContext.traceContext.map { - tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) + tctx => //tctx.tracer ! WebExternalFinish(requestContext.timestamp) } pjp.proceed() - } + }*/ } @@ -94,17 +61,17 @@ class SprayServerInstrumentation { @Around("copyingRequestContext(old)") def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { println("Instrumenting the request context copy.") - Tracer.traceContext.withValue(old.traceContext) { + /*Tracer.traceContext.withValue(old.traceContext) { pjp.proceed() - } + }*/ } } -case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext - @Aspect class SprayRequestContextTracing { @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() + def mixin: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala index 392f53b8..6f913a67 100644 --- a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala @@ -1,18 +1,19 @@ -package kamon.trace +package kamon.spray import spray.routing.directives.BasicDirectives import spray.routing._ -import kamon.Tracer import java.util.concurrent.atomic.AtomicLong import scala.util.Try import java.net.InetAddress +import kamon.trace.Trace trait UowDirectives extends BasicDirectives { def uow: Directive0 = mapRequest { request => val uowHeader = request.headers.find(_.name == "X-UOW") val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) - Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) + // TODO: Tracer will always have a context at this point, just rename the uow. + //Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) request } @@ -20,9 +21,7 @@ trait UowDirectives extends BasicDirectives { object UowDirectives { val uowCounter = new AtomicLong - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) }
\ No newline at end of file diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala new file mode 100644 index 00000000..d5e21f35 --- /dev/null +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -0,0 +1,56 @@ +package spray.can.server + +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} +import kamon.trace.{Trace, TraceContext, ContextAware} +import spray.http.HttpRequest +import akka.actor.ActorSystem +import akka.event.Logging.Warning + + +@Aspect +class ServerRequestTracing { + + @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") + def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default + + + @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") + def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} + + @After("openRequestInit(openRequest, request)") + def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + val defaultTraceName: String = request.method.value + ": " + request.uri.path + + Trace.start(defaultTraceName)(system) + + // Necessary to force initialization of traceContext when initiating the request. + openRequest.traceContext + } + + @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") + def openRequestCreation(openRequest: ContextAware): Unit = {} + + @After("openRequestCreation(openRequest)") + def afterFinishingRequest(openRequest: ContextAware): Unit = { + val storedContext = openRequest.traceContext + val incomingContext = Trace.finish() + + for(original <- storedContext) { + incomingContext match { + case Some(incoming) if original.id != incoming.id => + publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") + + case Some(_) => // nothing to do here. + + case None => + publishWarning(s"Trace context not present while closing the Trace: [$original]") + } + } + + def publishWarning(text: String): Unit = { + val system: ActorSystem = openRequest.asInstanceOf[OpenRequest].context.actorContext.system + system.eventStream.publish(Warning("", classOf[ServerRequestTracing], text)) + } + } +} diff --git a/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala new file mode 100644 index 00000000..4cff38be --- /dev/null +++ b/kamon-spray/src/test/scala/kamon/ServerRequestTracingSpec.scala @@ -0,0 +1,49 @@ +package kamon + +import _root_.spray.httpx.RequestBuilding +import _root_.spray.routing.SimpleRoutingApp +import akka.testkit.TestKit +import akka.actor.{ActorRef, ActorSystem} +import org.scalatest.WordSpecLike +import scala.concurrent.Await +import scala.concurrent.duration._ +import _root_.spray.client.pipelining._ +import akka.util.Timeout +import kamon.trace.Trace +import kamon.Kamon.Extension +import kamon.trace.UowTracing.{Finish, Start} + +class ServerRequestTracingSpec extends TestKit(ActorSystem("server-request-tracing-spec")) with WordSpecLike with RequestBuilding { + + "the spray server request tracing instrumentation" should { + "start tracing a request when entering the server and close it when responding" in new TestServer { + client(Get(s"http://127.0.0.1:$port/")) + + within(5 seconds) { + val traceId = expectMsgPF() { case Start(id) => id} + expectMsgPF() { case Finish(traceId) => } + } + } + } + + + + trait TestServer extends SimpleRoutingApp { + + // Nasty, but very helpful for tests. + AkkaExtensionSwap.swap(system, Trace, new Extension { + def manager: ActorRef = testActor + }) + + implicit val timeout = Timeout(20 seconds) + val port: Int = Await.result( + startServer(interface = "127.0.0.1", port = 0)( + get { + complete("ok") + } + ), timeout.duration).localAddress.getPort + + val client = sendReceive(system, system.dispatcher, timeout) + + } +} diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala index 4ea89850..232b7420 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala @@ -6,6 +6,7 @@ import akka.actor._ import scala.Some import kamon.trace.Trace.Register import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicLong object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: Extension] = Trace @@ -14,10 +15,31 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { /*** Protocol */ case object Register + + + + /** User API */ + private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) + private[trace] val tranid = new AtomicLong() + + + def context() = traceContext.value + def set(ctx: TraceContext) = traceContext.value = Some(ctx) + + def start(name: String)(implicit system: ActorSystem) = set(newTraceContext) + + def finish(): Option[TraceContext] = { + val ctx = context() + ctx.map(_.finish) + ctx + } + + // TODO: FIX + def newTraceContext()(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace), tranid.getAndIncrement) } class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - def manager: ActorRef = ??? + def manager: ActorRef = system.actorOf(Props[TraceManager]) } class TraceManager extends Actor { @@ -35,15 +57,3 @@ class TraceManager extends Actor { listeners foreach(_ ! trace) } } - - -object Tracer { - val traceContext = new DynamicVariable[Option[TraceContext]](None) - - - def context() = traceContext.value - def set(ctx: TraceContext) = traceContext.value = Some(ctx) - - def start = set(newTraceContext) - def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) -} diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index c3f1f2c2..f8491c12 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -4,56 +4,32 @@ import java.util.UUID import akka.actor._ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ +import kamon.Kamon +import kamon.trace.UowTracing.{Finish, Start} // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) +protected[kamon] case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { + collector ! Start(id) -object TraceContext { - - def apply()(implicit system: ActorSystem) = { - val n = traceIdCounter.incrementAndGet() - val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}") - actor ! Start() - - new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + def finish: Unit = { + collector ! Finish(id) } -} - -class TraceAccumulator extends Actor { - def receive = { - case a => println("Trace Accumulated: "+a) - } } -trait TraceEntry -case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry -case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - -object Collector - -trait TraceEntryStorage { - def store(entry: TraceEntry): Boolean +trait ContextAware { + def traceContext: Option[TraceContext] } -class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { - def store(entry: TraceEntry) = storage.store(entry) -} - -object ThreadLocalTraceEntryStorage extends TraceEntryStorage { - - private val storage = new ThreadLocal[List[TraceEntry]] { - override def initialValue(): List[TraceEntry] = Nil - } - - def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) - - def store(entry: TraceEntry): Boolean = { - update(entry :: _) - true +object ContextAware { + def default: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() } } - +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala index 9b53bd5d..77993cdd 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala @@ -1,10 +1,9 @@ -package kamon.instrumentation +package kamon.trace.instrumentation import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import kamon.{Tracer} import org.aspectj.lang.ProceedingJoinPoint import org.slf4j.MDC -import kamon.trace.TraceContext +import kamon.trace.{TraceContext, ContextAware, Trace} @Aspect class ActorLoggingInstrumentation { @@ -12,7 +11,7 @@ class ActorLoggingInstrumentation { @DeclareMixin("akka.event.Logging.LogEvent+") def traceContextMixin: ContextAware = new ContextAware { - def traceContext: Option[TraceContext] = Tracer.context() + def traceContext: Option[TraceContext] = Trace.context() } @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala index 9b5ce0a4..3caba77c 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala @@ -1,17 +1,14 @@ -package kamon.instrumentation +package kamon.trace.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer} import akka.dispatch.{Envelope, MessageDispatcher} import com.codahale.metrics.Timer -import scala.Some -import kamon.trace.context.TracingAwareContext -import kamon.trace.TraceContext +import kamon.trace.{ContextAware, TraceContext, Trace} case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) -case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Trace.traceContext.value, timestamp: Long = System.nanoTime) extends ContextAware @Aspect class ActorCellInvokeInstrumentation { @@ -25,9 +22,9 @@ class ActorCellInvokeInstrumentation { @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { //safe cast - val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext + val msgContext = envelope.asInstanceOf[ContextAware].traceContext - Tracer.traceContext.withValue(msgContext) { + Trace.traceContext.withValue(msgContext) { pjp.proceed() } } @@ -37,13 +34,15 @@ class ActorCellInvokeInstrumentation { class EnvelopeTracingContext { @DeclareMixin("akka.dispatch.Envelope") - def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() + def mixin: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def requestRecordInit(ctx: TracingAwareContext): Unit = {} + @Pointcut("execution(akka.dispatch.ContextAware.new(..)) && this(ctx)") + def requestRecordInit(ctx: ContextAware): Unit = {} @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { + def whenCreatedRequestRecord(ctx: ContextAware): Unit = { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. ctx.traceContext } diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala index 236fd4fc..3e5a7cce 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala @@ -2,7 +2,7 @@ package kamon.trace.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.TraceContext +import kamon.trace.{TraceContext, Trace} @Aspect class RunnableTracing { @@ -13,7 +13,7 @@ class RunnableTracing { */ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { - val traceContext: Option[TraceContext] = Tracer.traceContext.value + val traceContext: Option[TraceContext] = Trace.traceContext.value } @@ -40,7 +40,7 @@ class RunnableTracing { def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { import pjp._ - Tracer.traceContext.withValue(runnable.traceContext) { + Trace.traceContext.withValue(runnable.traceContext) { proceed() } } diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala index cdfa2813..f5d88f06 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala @@ -1,10 +1,10 @@ -package akka.instrumentation +package kamon import org.scalatest.{WordSpecLike, Matchers} import akka.actor.{ActorRef, Actor, Props, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import kamon.{Tracer} +import kamon.trace.Trace import akka.pattern.{pipe, ask} import akka.util.Timeout import scala.concurrent.duration._ @@ -65,18 +65,18 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation } trait TraceContextEchoFixture { - val testTraceContext = Tracer.newTraceContext() + val testTraceContext = Trace.newTraceContext() val echo = system.actorOf(Props[TraceContextEcho]) - Tracer.set(testTraceContext) + Trace.set(testTraceContext) } trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { - val context = Tracer.newTraceContext() - Tracer.set(context) + val context = Trace.newTraceContext() + Trace.set(context) target ! message context @@ -87,7 +87,7 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation class TraceContextEcho extends Actor { def receive = { - case msg: String ⇒ sender ! Tracer.context() + case msg: String ⇒ sender ! Trace.context() } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala index 570f64dd..f968fa83 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala @@ -1,15 +1,14 @@ -package kamon.instrumentation +package kamon import scala.concurrent.{Await, Promise, Future} import org.scalatest.{Matchers, OptionValues, WordSpec} import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Tracer, Kamon} import java.util.UUID import scala.util.Success import scala.concurrent.duration._ import java.util.concurrent.TimeUnit -import akka.actor.ActorSystem -import kamon.trace.TraceContext +import akka.actor.{Actor, ActorSystem} +import kamon.trace.{Trace, TraceContext} class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { @@ -19,20 +18,20 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur "preserve the TraceContext" which { "should be available during the run method execution" in new FutureWithContextFixture { - whenReady(futureWithContext) { result => +/* whenReady(futureWithContext) { result => result.value should equal(testContext) - } + }*/ } "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { val onCompleteContext = Promise[Option[TraceContext]]() - Tracer.traceContext.withValue(Some(testContext)) { +/* Tracer.traceContext.withValue(Some(testContext)) { futureWithContext.onComplete({ case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) }) - } + }*/ whenReady(onCompleteContext.future) { result => result should equal(Some(testContext)) @@ -52,7 +51,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur val onCompleteContext = Promise[Option[TraceContext]]() futureWithoutContext.onComplete { - case _ => onCompleteContext.complete(Success(Tracer.traceContext.value)) + case _ => onCompleteContext.complete(Success(Trace.context())) } whenReady(onCompleteContext.future) { result => @@ -70,16 +69,16 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur implicit val execContext = testActorSystem.dispatcher class FutureWithContextFixture { - val testContext = TraceContext() + val testContext = TraceContext(Actor.noSender, 1) - var futureWithContext: Future[Option[TraceContext]] = _ - Tracer.traceContext.withValue(Some(testContext)) { +/* var futureWithContext: Future[Option[TraceContext]] = _ + Tracer.context.withValue(Some(testContext)) { futureWithContext = Future { Tracer.traceContext.value } - } + }*/ } trait FutureWithoutContextFixture { - val futureWithoutContext = Future { Tracer.traceContext.value } + val futureWithoutContext = Future { Trace.context.value } } } diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala index 60b5f06d..a8e736ae 100644 --- a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala +++ b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala @@ -1,30 +1,31 @@ -package kamon.trace +package kamon import org.scalatest.{WordSpecLike, WordSpec} import akka.testkit.{TestKitBase, TestKit} import akka.actor.ActorSystem import scala.concurrent.duration._ import kamon.trace.UowTracing.{Finish, Rename, Start} +import kamon.trace.{UowTrace, UowTraceAggregator} class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { "a TraceAggregator" should { "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { within(1 second) { - aggregator ! Start() - aggregator ! Finish() + aggregator ! Start(1) + aggregator ! Finish(1) - expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish()))) + expectMsg(UowTrace("UNKNOWN", Seq(Start(1), Finish(1)))) } } "change the uow name after receiving a Rename message" in new AggregatorFixture { within(1 second) { - aggregator ! Start() - aggregator ! Rename("test-uow") - aggregator ! Finish() + aggregator ! Start(1) + aggregator ! Rename(1, "test-uow") + aggregator ! Finish(1) - expectMsg(UowTrace("test-uow", Seq(Start(), Finish()))) + expectMsg(UowTrace("test-uow", Seq(Start(1), Finish(1)))) } } } diff --git a/project/AspectJ.scala b/project/AspectJ.scala index a1cc27e0..6118aa03 100644 --- a/project/AspectJ.scala +++ b/project/AspectJ.scala @@ -8,8 +8,8 @@ object AspectJ { lazy val aspectJSettings = aspectjSettings ++ Seq( compileOnly in Aspectj := true, - fork in (Test, run) := true, - javaOptions in (Test, run) <++= weaverOptions in Aspectj, + fork in Test := true, + javaOptions in Test <++= weaverOptions in Aspectj, lintProperties in Aspectj += "invalidAbsoluteTypeName = ignore" ) }
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index acdd1791..4f5c758a 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -69,7 +69,7 @@ object Build extends Build { .settings( libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayCan, sprayClient, sprayRouting, logback)) - .dependsOn(kamonCore) + .dependsOn(kamonSpray, kamonNewrelic) |