From 2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 5 Nov 2013 18:38:39 -0300 Subject: basic separation of concerns between sub-projects --- kamon-core/src/main/resources/META-INF/aop.xml | 37 -------- .../META-INF/aop_remove_when_possible.xml | 37 ++++++++ kamon-core/src/main/resources/logback.xml | 12 --- kamon-core/src/main/resources/reference.conf | 11 --- .../src/main/scala/kamon/AkkaExtensionSwap.scala | 14 +++ kamon-core/src/main/scala/kamon/Kamon.scala | 13 --- .../instrumentation/ActorInstrumentation.scala | 42 --------- .../ActorLoggingInstrumentation.scala | 32 ------- .../ActorRefTellInstrumentation.scala | 50 ---------- .../instrumentation/ExecutorServiceMetrics.scala | 6 +- .../main/scala/kamon/metric/NewRelicReporter.scala | 51 ----------- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 18 ---- .../scala/kamon/newrelic/NewRelicReporting.scala | 52 ----------- .../src/main/scala/kamon/trace/UowDirectives.scala | 28 ------ .../kamon/trace/context/TracingAwareContext.scala | 8 -- .../main/scala/test/SimpleRequestProcessor.scala | 101 --------------------- kamon-core/src/test/scala/ExtraSpec.scala | 34 ------- .../instrumentation/ActorInstrumentationSpec.scala | 94 ------------------- .../ActorSystemInstrumentationSpec.scala | 22 ----- .../DispatcherInstrumentationSpec.scala | 34 ------- .../MessageQueueInstrumentationSpec.scala | 53 ----------- .../RunnableInstrumentationSpec.scala | 86 ------------------ .../scala/kamon/trace/TraceAggregatorSpec.scala | 36 -------- 23 files changed, 54 insertions(+), 817 deletions(-) delete mode 100644 kamon-core/src/main/resources/META-INF/aop.xml create mode 100644 kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml delete mode 100644 kamon-core/src/main/resources/logback.xml delete mode 100644 kamon-core/src/main/resources/reference.conf create mode 100644 kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala delete mode 100644 kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala delete mode 100644 kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/UowDirectives.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala delete mode 100644 kamon-core/src/main/scala/test/SimpleRequestProcessor.scala delete mode 100644 kamon-core/src/test/scala/ExtraSpec.scala delete mode 100644 kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml deleted file mode 100644 index c63e17e5..00000000 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ /dev/null @@ -1,37 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml new file mode 100644 index 00000000..207bf1b9 --- /dev/null +++ b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-core/src/main/resources/logback.xml deleted file mode 100644 index 2ae1e3bd..00000000 --- a/kamon-core/src/main/resources/logback.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - %date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n - - - - - - - - diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf deleted file mode 100644 index 29532595..00000000 --- a/kamon-core/src/main/resources/reference.conf +++ /dev/null @@ -1,11 +0,0 @@ -akka { - loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"] -} - - - - - - - - diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala new file mode 100644 index 00000000..3d503d54 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala @@ -0,0 +1,14 @@ +package kamon + +import akka.actor.{ActorSystem, ExtensionId} +import java.util.concurrent.ConcurrentHashMap + +object AkkaExtensionSwap { + def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = { + val extensionsField = system.getClass.getDeclaredField("extensions") + extensionsField.setAccessible(true) + + val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] + extensions.put(key, value) + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 8c934f60..5d0d77a3 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,16 +1,6 @@ package kamon import akka.actor._ -import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} -import scala.concurrent.duration.FiniteDuration -import scala.collection.concurrent.TrieMap -import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} -import kamon.metric.ActorSystemMetrics - - -object Instrument { - val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation -} object Kamon { trait Extension extends akka.actor.Extension { @@ -18,8 +8,5 @@ object Kamon { } def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager - - - implicit lazy val actorSystem = ActorSystem("kamon") } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala deleted file mode 100644 index 4e078201..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala +++ /dev/null @@ -1,42 +0,0 @@ -package kamon.instrumentation - -import akka.actor.{Props, ActorSystem, ActorRef} -import akka.dispatch.{MessageDispatcher, Envelope} -import kamon.{Tracer} -import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage -import kamon.trace.TraceContext - -trait ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation -} - - -trait ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) -} - -object ActorReceiveInvokeInstrumentation { - val noopPreReceive = new ActorReceiveInvokeInstrumentation{ - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None) - } -} - -class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context) - - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = { - new ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match { - case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx) - case anyOther => (anyOther, None) - } - } - } -} - -object SimpleContextPassingInstrumentation { - case class SimpleTraceMessage(message: Any, context: Option[TraceContext]) -} - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala deleted file mode 100644 index 9b53bd5d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,32 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import kamon.{Tracer} -import org.aspectj.lang.ProceedingJoinPoint -import org.slf4j.MDC -import kamon.trace.TraceContext - -@Aspect -class ActorLoggingInstrumentation { - - - @DeclareMixin("akka.event.Logging.LogEvent+") - def traceContextMixin: ContextAware = new ContextAware { - def traceContext: Option[TraceContext] = Tracer.context() - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { - logEvent.traceContext match { - case Some(ctx) => - MDC.put("uow", ctx.uow) - pjp.proceed() - MDC.remove("uow") - - case None => pjp.proceed() - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index 9b5ce0a4..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,50 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer} -import akka.dispatch.{Envelope, MessageDispatcher} -import com.codahale.metrics.Timer -import scala.Some -import kamon.trace.context.TracingAwareContext -import kamon.trace.TraceContext - -case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) -case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext - -@Aspect -class ActorCellInvokeInstrumentation { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - //safe cast - val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext - - Tracer.traceContext.withValue(msgContext) { - pjp.proceed() - } - } -} - -@Aspect -class EnvelopeTracingContext { - - @DeclareMixin("akka.dispatch.Envelope") - def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def requestRecordInit(ctx: TracingAwareContext): Unit = {} - - @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { - // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - ctx.traceContext - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index b4f8a475..3a091775 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -21,7 +21,7 @@ class ActorSystemInstrumentation { @After("actorSystemInstantiation(name, applicationConfig, classLoader)") def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { - Kamon.Metric.registerActorSystem(name) + //Kamon.Metric.registerActorSystem(name) } } @@ -35,7 +35,7 @@ class ForkJoinPoolInstrumentation { @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { - val (actorSystemName, dispatcherName) = threadFactory match { + /*val (actorSystemName, dispatcherName) = threadFactory match { case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) case _ => ("Unknown", "Unknown") } @@ -45,7 +45,7 @@ class ForkJoinPoolInstrumentation { activeThreadsHistogram = m.activeThreadCount poolSizeHistogram = m.poolSize println(s"Registered $dispatcherName for actor system $actorSystemName") - } + }*/ } def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala deleted file mode 100644 index 70f3e54a..00000000 --- a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics -import metrics._ -import java.util.concurrent.TimeUnit -import java.util -import com.newrelic.api.agent.NewRelic -import scala.collection.JavaConverters._ - - -class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { - - - - private[NewRelicReporter] def processMeter(name: String, meter: Meter) { - NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) - } - - private[NewRelicReporter] def processCounter(name:String, counter:Counter) { - println(s"Logging to NewRelic: ${counter.getCount}") - - } - - -/* def processGauge(name: String, gauge: Gauge[_]) = { - println(s"the value is: "+gauge.getValue) - NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) - }*/ - - - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { - //Process Meters - meters.asScala.map{case(name, meter) => processMeter(name, meter)} - - //Process Meters - counters.asScala.map{case(name, counter) => processCounter(name, counter)} - - // Gauges - gauges.asScala.foreach{ case (name, gauge) => { - val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() - val fullMetricName = "Custom" + name - NewRelic.recordMetric(fullMetricName, measure) - }} - } - - -} - -object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala deleted file mode 100644 index 4bc49496..00000000 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ /dev/null @@ -1,18 +0,0 @@ -package kamon.newrelic - -import akka.actor.Actor -import akka.event.Logging.Error -import akka.event.Logging.{LoggerInitialized, InitializeLogger} -import com.newrelic.api.agent.NewRelic -import NewRelic.noticeError - -class NewRelicErrorLogger extends Actor { - def receive = { - case InitializeLogger(_) => sender ! LoggerInitialized - case error @ Error(cause, logSource, logClass, message) => notifyError(error) - } - - def notifyError(error: Error): Unit = { - noticeError(error.cause) - } -} diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala deleted file mode 100644 index 106f27e2..00000000 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ /dev/null @@ -1,52 +0,0 @@ -package kamon.newrelic - -import akka.actor.Actor -import kamon.trace.UowTrace -import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} -import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} -import java.util -import java.util.Date - - -class NewRelicReporting extends Actor { - def receive = { - case trace: UowTrace => recordTransaction(trace) - } - - def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) - - NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) - NewRelic.recordMetric("WebTransaction", time.toFloat) - NewRelic.recordMetric("HttpDispatcher", time.toFloat) - - uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => - val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat - - println("Web External: " + webExternalTrace) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) - } - - - val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) - - - def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil => accum - case head :: tail => - if(head.start > lastEnd) - measureExternal(accum + (head.finish-head.start), head.finish, tail) - else - measureExternal(accum + (head.finish-lastEnd), head.finish, tail) - } - - val external = measureExternal(0, 0, allExternals) / 1E9 - - - NewRelic.recordMetric(s"External/all", external.toFloat) - NewRelic.recordMetric(s"External/allWeb", external.toFloat) - - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala deleted file mode 100644 index 392f53b8..00000000 --- a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala +++ /dev/null @@ -1,28 +0,0 @@ -package kamon.trace - -import spray.routing.directives.BasicDirectives -import spray.routing._ -import kamon.Tracer -import java.util.concurrent.atomic.AtomicLong -import scala.util.Try -import java.net.InetAddress - -trait UowDirectives extends BasicDirectives { - def uow: Directive0 = mapRequest { request => - val uowHeader = request.headers.find(_.name == "X-UOW") - - val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) - Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) - - request - } -} - -object UowDirectives { - val uowCounter = new AtomicLong - - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) - -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala deleted file mode 100644 index c8d0d4f0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala +++ /dev/null @@ -1,8 +0,0 @@ -package kamon.trace.context - -import kamon.trace.TraceContext - -trait TracingAwareContext { - def traceContext: Option[TraceContext] - def timestamp: Long -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala deleted file mode 100644 index da3c6c6a..00000000 --- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala +++ /dev/null @@ -1,101 +0,0 @@ -package test - -import akka.actor._ -import kamon.Tracer -import spray.routing.SimpleRoutingApp -import akka.util.Timeout -import spray.httpx.RequestBuilding -import scala.concurrent.{Await, Future} -import kamon.trace.UowDirectives - -object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { - import scala.concurrent.duration._ - import spray.client.pipelining._ - import akka.pattern.ask - - implicit val system = ActorSystem("test") - import system.dispatcher - - val act = system.actorOf(Props(new Actor { - def receive: Actor.Receive = { case any => sender ! any } - }), "com.despegar-2:[]s-w@&,*") - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - val replier = system.actorOf(Props[Replier]) - - startServer(interface = "localhost", port = 9090) { - get { - path("test"){ - uow { - complete { - val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil - - Future.sequence(futures).map(l => "Ok") - } - } - } ~ - path("reply" / Segment) { reqID => - uow { - complete { - if (Tracer.context().isEmpty) - println("ROUTE NO CONTEXT") - - (replier ? reqID).mapTo[String] - } - } - } ~ - path("ok") { - complete("ok") - } ~ - path("future") { - dynamic { - complete(Future { "OK" }) - } - } ~ - path("error") { - complete { - throw new NullPointerException - "okk" - } - } - } - } - -} - -object Verifier extends App { - - def go: Unit = { - import scala.concurrent.duration._ - import spray.client.pipelining._ - - implicit val system = ActorSystem("test") - import system.dispatcher - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - - val futures = Future.sequence(for(i <- 1 to 500) yield { - pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) - }) - println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) - } - - - - -} - -class Replier extends Actor with ActorLogging { - def receive = { - case anything => - if(Tracer.context.isEmpty) - log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") - - log.info("Processing at the Replier") - sender ! anything - } -} diff --git a/kamon-core/src/test/scala/ExtraSpec.scala b/kamon-core/src/test/scala/ExtraSpec.scala deleted file mode 100644 index b8dc053d..00000000 --- a/kamon-core/src/test/scala/ExtraSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -import akka.actor.ActorSystem -import akka.testkit.TestKit -import org.scalatest.WordSpecLike -import shapeless._ - -class ExtraSpec extends TestKit(ActorSystem("ExtraSpec")) with WordSpecLike { - - "the Extra pattern helper" should { - "be constructed from a finite number of types" in { - Extra.expecting[String :: Int :: HNil].as[Person] - } - } - - case class Person(name: String, age: Int) -} - -/** - * Desired Features: - * 1. Expect messages of different types, apply a function and forward to some other. - */ - -object Extra { - def expecting[T <: HList] = new Object { - def as[U <: Product] = ??? - } -} - -/* -extra of { - expect[A] in { actor ! msg} - expect[A] in { actor ! msg} -} as (A, A) pipeTo (z)*/ - - diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala deleted file mode 100644 index cdfa2813..00000000 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -package akka.instrumentation - -import org.scalatest.{WordSpecLike, Matchers} -import akka.actor.{ActorRef, Actor, Props, ActorSystem} - -import akka.testkit.{ImplicitSender, TestKit} -import kamon.{Tracer} -import akka.pattern.{pipe, ask} -import akka.util.Timeout -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import akka.routing.RoundRobinRouter -import kamon.trace.TraceContext - - -class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { - implicit val executionContext = system.dispatcher - - "an instrumented actor ref" when { - "used inside the context of a transaction" should { - "propagate the trace context using bang" in new TraceContextEchoFixture { - echo ! "test" - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context using tell" in new TraceContextEchoFixture { - echo.tell("test", testActor) - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context using ask" in new TraceContextEchoFixture { - implicit val timeout = Timeout(1 seconds) - (echo ? "test") pipeTo(testActor) - - expectMsg(Some(testTraceContext)) - } - - "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { - val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) - - expectMsgAllOf(contexts: _*) - } - - /*"propagate with many asks" in { - val echo = system.actorOf(Props[TraceContextEcho]) - val iterations = 50000 - implicit val timeout = Timeout(10 seconds) - - val futures = for(_ <- 1 to iterations) yield { - Tracer.start - val result = (echo ? "test") - Tracer.clear - - result - } - - val allResults = Await.result(Future.sequence(futures), 10 seconds) - assert(iterations == allResults.collect { - case Some(_) => 1 - }.sum) - }*/ - } - } - - trait TraceContextEchoFixture { - val testTraceContext = Tracer.newTraceContext() - val echo = system.actorOf(Props[TraceContextEcho]) - - Tracer.set(testTraceContext) - } - - trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { - override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) - - def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { - val context = Tracer.newTraceContext() - Tracer.set(context) - - target ! message - context - } - } - -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! Tracer.context() - } -} - - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala deleted file mode 100644 index 1eab6355..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.ActorSystem -import kamon.Kamon - -class ActorSystemInstrumentationSpec extends WordSpec with Matchers { - - // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. - - "the actor system instrumentation" should { - "register all actor systems created" in { - val as1 = ActorSystem("as1") - val as2 = ActorSystem("as2") - - - Kamon.Metric.actorSystem("as1") should not be (None) - Kamon.Metric.actorSystem("as2") should not be (None) - Kamon.Metric.actorSystem("unknown") should be (None) - } - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala deleted file mode 100644 index 89ef61f3..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.{Matchers, WordSpec} -import akka.actor.{Actor, Props, ActorSystem} -import kamon.metric.MetricDirectory -import kamon.Kamon - -class DispatcherInstrumentationSpec extends WordSpec with Matchers{ - - - "the dispatcher instrumentation" should { - "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem { - val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers - (1 to 10).foreach(actor ! _) - - val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot - println("Active max: "+active.max) - println("Active min: "+active.min) - - } - } - - - trait SingleDispatcherActorSystem { - val actorSystem = ActorSystem("single-dispatcher") - val actor = actorSystem.actorOf(Props(new Actor { - def receive = { - case a => sender ! a; - } - })) - - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala deleted file mode 100644 index cc55ec92..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -package kamon.instrumentation - -import org.scalatest.WordSpec -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import java.util.concurrent.ConcurrentLinkedQueue -import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} -import java.util.Queue -import akka.actor.{ActorSystem, Actor} - -class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { - def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) - - - /*"A MonitoredMessageQueue" should { - "update the related histogram when a message is enqueued" in { - new PopulatedMessageQueueFixture { - - assert(histogram.getSnapshot.getMax === 0) - - for(i <- 1 to 3) { enqueueDummyMessage } - - assert(histogram.getCount === 3) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - - "update the related histogram when a message is dequeued" in { - new PopulatedMessageQueueFixture { - for(i <- 1 to 3) { enqueueDummyMessage } - assert(histogram.getSnapshot.getMax === 3) - - messageQueue.dequeue() - messageQueue.dequeue() - - assert(histogram.getCount === 5) - assert(histogram.getSnapshot.getMax === 3) - assert(histogram.getSnapshot.getMin === 1) - } - } - } - - trait PopulatedMessageQueueFixture { - - val histogram = new Histogram(new ExponentiallyDecayingReservoir()) -/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final def queue: Queue[Envelope] = this - }*/ - val messageQueue = new MonitoredMessageQueue(delegate, histogram) - - def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) - }*/ -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala deleted file mode 100644 index 570f64dd..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -package kamon.instrumentation - -import scala.concurrent.{Await, Promise, Future} -import org.scalatest.{Matchers, OptionValues, WordSpec} -import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Tracer, Kamon} -import java.util.UUID -import scala.util.Success -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit -import akka.actor.ActorSystem -import kamon.trace.TraceContext - - -class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { - - "a instrumented runnable" when { - "created in a thread that does have a TraceContext" must { - "preserve the TraceContext" which { - "should be available during the run method execution" in new FutureWithContextFixture { - - whenReady(futureWithContext) { result => - result.value should equal(testContext) - } - } - - "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { - - val onCompleteContext = Promise[Option[TraceContext]]() - - Tracer.traceContext.withValue(Some(testContext)) { - futureWithContext.onComplete({ - case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) - }) - } - - whenReady(onCompleteContext.future) { result => - result should equal(Some(testContext)) - } - } - } - } - - "created in a thread that doest have a TraceContext" must { - "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{ - whenReady(futureWithoutContext) { result => - result should equal(None) - } - } - - "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture { - val onCompleteContext = Promise[Option[TraceContext]]() - - futureWithoutContext.onComplete { - case _ => onCompleteContext.complete(Success(Tracer.traceContext.value)) - } - - whenReady(onCompleteContext.future) { result => - result should equal(None) - } - } - } - } - - - /** - * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. - */ - implicit val testActorSystem = ActorSystem("test-actorsystem") - implicit val execContext = testActorSystem.dispatcher - - class FutureWithContextFixture { - val testContext = TraceContext() - - var futureWithContext: Future[Option[TraceContext]] = _ - Tracer.traceContext.withValue(Some(testContext)) { - futureWithContext = Future { Tracer.traceContext.value } - } - } - - trait FutureWithoutContextFixture { - val futureWithoutContext = Future { Tracer.traceContext.value } - } -} - - diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala deleted file mode 100644 index 60b5f06d..00000000 --- a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -package kamon.trace - -import org.scalatest.{WordSpecLike, WordSpec} -import akka.testkit.{TestKitBase, TestKit} -import akka.actor.ActorSystem -import scala.concurrent.duration._ -import kamon.trace.UowTracing.{Finish, Rename, Start} - -class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { - - "a TraceAggregator" should { - "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { - within(1 second) { - aggregator ! Start() - aggregator ! Finish() - - expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish()))) - } - } - - "change the uow name after receiving a Rename message" in new AggregatorFixture { - within(1 second) { - aggregator ! Start() - aggregator ! Rename("test-uow") - aggregator ! Finish() - - expectMsg(UowTrace("test-uow", Seq(Start(), Finish()))) - } - } - } - - - trait AggregatorFixture { - val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) - } -} -- cgit v1.2.3