aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
commit2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 (patch)
tree56c4ad1f025c9144376cd4463ad4d4a23e37b571 /kamon-core
parent5127c3bb83cd6fe90e071720d995cfb53d913e6a (diff)
downloadKamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.gz
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.bz2
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.zip
basic separation of concerns between sub-projects
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml (renamed from kamon-core/src/main/resources/META-INF/aop.xml)6
-rw-r--r--kamon-core/src/main/resources/logback.xml12
-rw-r--r--kamon-core/src/main/resources/reference.conf11
-rw-r--r--kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala42
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala32
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala50
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowDirectives.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala8
-rw-r--r--kamon-core/src/main/scala/test/SimpleRequestProcessor.scala101
-rw-r--r--kamon-core/src/test/scala/ExtraSpec.scala34
-rw-r--r--kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala94
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala22
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala34
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala53
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala86
-rw-r--r--kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala36
22 files changed, 20 insertions, 783 deletions
diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml
index c63e17e5..207bf1b9 100644
--- a/kamon-core/src/main/resources/META-INF/aop.xml
+++ b/kamon-core/src/main/resources/META-INF/aop_remove_when_possible.xml
@@ -7,14 +7,14 @@
<aspects>
- <aspect name="kamon.instrumentation.EnvelopeTracingContext"/>
- <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/>
+ <aspect name="kamon.trace.instrumentation.EnvelopeTracingContext"/>
+ <aspect name="kamon.trace.instrumentation.ActorCellInvokeInstrumentation"/>
<aspect name="kamon.trace.instrumentation.RunnableTracing" />
<aspect name="kamon.instrumentation.SprayRequestContextTracing"/>
<aspect name="kamon.instrumentation.SprayOpenRequestContextTracing"/>
<aspect name = "kamon.instrumentation.SprayServerInstrumentation"/>
<aspect name="kamon.instrumentation.ActorSystemInstrumentation"/>
- <aspect name="kamon.instrumentation.ActorLoggingInstrumentation"/>
+ <aspect name="kamon.trace.instrumentation.ActorLoggingInstrumentation"/>
<!--<aspect name="kamon.instrumentation.MessageQueueInstrumentation" />-->
<!--<aspect name="kamon.instrumentation.InceptionAspect"/>-->
diff --git a/kamon-core/src/main/resources/logback.xml b/kamon-core/src/main/resources/logback.xml
deleted file mode 100644
index 2ae1e3bd..00000000
--- a/kamon-core/src/main/resources/logback.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<configuration scan="true">
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%date{HH:mm:ss.SSS} %-5level [%X{uow}][%X{requestId}] [%thread] %logger{55} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="debug">
- <appender-ref ref="STDOUT" />
- </root>
-
-</configuration>
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
deleted file mode 100644
index 29532595..00000000
--- a/kamon-core/src/main/resources/reference.conf
+++ /dev/null
@@ -1,11 +0,0 @@
-akka {
- loggers = ["kamon.newrelic.NewRelicErrorLogger", "akka.event.slf4j.Slf4jLogger"]
-}
-
-
-
-
-
-
-
-
diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
new file mode 100644
index 00000000..3d503d54
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
@@ -0,0 +1,14 @@
+package kamon
+
+import akka.actor.{ActorSystem, ExtensionId}
+import java.util.concurrent.ConcurrentHashMap
+
+object AkkaExtensionSwap {
+ def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = {
+ val extensionsField = system.getClass.getDeclaredField("extensions")
+ extensionsField.setAccessible(true)
+
+ val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]]
+ extensions.put(key, value)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 8c934f60..5d0d77a3 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,16 +1,6 @@
package kamon
import akka.actor._
-import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
-import scala.concurrent.duration.FiniteDuration
-import scala.collection.concurrent.TrieMap
-import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration}
-import kamon.metric.ActorSystemMetrics
-
-
-object Instrument {
- val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation
-}
object Kamon {
trait Extension extends akka.actor.Extension {
@@ -18,8 +8,5 @@ object Kamon {
}
def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager
-
-
- implicit lazy val actorSystem = ActorSystem("kamon")
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
deleted file mode 100644
index 4e078201..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package kamon.instrumentation
-
-import akka.actor.{Props, ActorSystem, ActorRef}
-import akka.dispatch.{MessageDispatcher, Envelope}
-import kamon.{Tracer}
-import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
-import kamon.trace.TraceContext
-
-trait ActorInstrumentationConfiguration {
- def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any
- def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation
-}
-
-
-trait ActorReceiveInvokeInstrumentation {
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext])
-}
-
-object ActorReceiveInvokeInstrumentation {
- val noopPreReceive = new ActorReceiveInvokeInstrumentation{
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None)
- }
-}
-
-class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration {
- def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context)
-
- def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = {
- new ActorReceiveInvokeInstrumentation {
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match {
- case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx)
- case anyOther => (anyOther, None)
- }
- }
- }
-}
-
-object SimpleContextPassingInstrumentation {
- case class SimpleTraceMessage(message: Any, context: Option[TraceContext])
-}
-
-
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
deleted file mode 100644
index 9b53bd5d..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
-import kamon.{Tracer}
-import org.aspectj.lang.ProceedingJoinPoint
-import org.slf4j.MDC
-import kamon.trace.TraceContext
-
-@Aspect
-class ActorLoggingInstrumentation {
-
-
- @DeclareMixin("akka.event.Logging.LogEvent+")
- def traceContextMixin: ContextAware = new ContextAware {
- def traceContext: Option[TraceContext] = Tracer.context()
- }
-
- @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
- def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {}
-
- @Around("withMdcInvocation(logSource, logEvent, logStatement)")
- def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {
- logEvent.traceContext match {
- case Some(ctx) =>
- MDC.put("uow", ctx.uow)
- pjp.proceed()
- MDC.remove("uow")
-
- case None => pjp.proceed()
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
deleted file mode 100644
index 9b5ce0a4..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Tracer}
-import akka.dispatch.{Envelope, MessageDispatcher}
-import com.codahale.metrics.Timer
-import scala.Some
-import kamon.trace.context.TracingAwareContext
-import kamon.trace.TraceContext
-
-case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
-case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext
-
-@Aspect
-class ActorCellInvokeInstrumentation {
-
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
- def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
-
- @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
- def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
- @Around("invokingActorBehaviourAtActorCell(envelope)")
- def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
- //safe cast
- val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext
-
- Tracer.traceContext.withValue(msgContext) {
- pjp.proceed()
- }
- }
-}
-
-@Aspect
-class EnvelopeTracingContext {
-
- @DeclareMixin("akka.dispatch.Envelope")
- def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext()
-
- @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def requestRecordInit(ctx: TracingAwareContext): Unit = {}
-
- @After("requestRecordInit(ctx)")
- def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = {
- // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
- ctx.traceContext
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index b4f8a475..3a091775 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -21,7 +21,7 @@ class ActorSystemInstrumentation {
@After("actorSystemInstantiation(name, applicationConfig, classLoader)")
def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
- Kamon.Metric.registerActorSystem(name)
+ //Kamon.Metric.registerActorSystem(name)
}
}
@@ -35,7 +35,7 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
- val (actorSystemName, dispatcherName) = threadFactory match {
+ /*val (actorSystemName, dispatcherName) = threadFactory match {
case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
case _ => ("Unknown", "Unknown")
}
@@ -45,7 +45,7 @@ class ForkJoinPoolInstrumentation {
activeThreadsHistogram = m.activeThreadCount
poolSizeHistogram = m.poolSize
println(s"Registered $dispatcherName for actor system $actorSystemName")
- }
+ }*/
}
def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
deleted file mode 100644
index 70f3e54a..00000000
--- a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-package kamon.metric
-
-import com.codahale.metrics
-import metrics._
-import java.util.concurrent.TimeUnit
-import java.util
-import com.newrelic.api.agent.NewRelic
-import scala.collection.JavaConverters._
-
-
-class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
-
-
-
- private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
- NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
- }
-
- private[NewRelicReporter] def processCounter(name:String, counter:Counter) {
- println(s"Logging to NewRelic: ${counter.getCount}")
-
- }
-
-
-/* def processGauge(name: String, gauge: Gauge[_]) = {
- println(s"the value is: "+gauge.getValue)
- NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
- }*/
-
-
- def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
- //Process Meters
- meters.asScala.map{case(name, meter) => processMeter(name, meter)}
-
- //Process Meters
- counters.asScala.map{case(name, counter) => processCounter(name, counter)}
-
- // Gauges
- gauges.asScala.foreach{ case (name, gauge) => {
- val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
- val fullMetricName = "Custom" + name
- NewRelic.recordMetric(fullMetricName, measure)
- }}
- }
-
-
-}
-
-object NewRelicReporter {
- def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
deleted file mode 100644
index 4bc49496..00000000
--- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.Actor
-import akka.event.Logging.Error
-import akka.event.Logging.{LoggerInitialized, InitializeLogger}
-import com.newrelic.api.agent.NewRelic
-import NewRelic.noticeError
-
-class NewRelicErrorLogger extends Actor {
- def receive = {
- case InitializeLogger(_) => sender ! LoggerInitialized
- case error @ Error(cause, logSource, logClass, message) => notifyError(error)
- }
-
- def notifyError(error: Error): Unit = {
- noticeError(error.cause)
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
deleted file mode 100644
index 106f27e2..00000000
--- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.Actor
-import kamon.trace.UowTrace
-import com.newrelic.api.agent.{Response, Request, Trace, NewRelic}
-import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart}
-import java.util
-import java.util.Date
-
-
-class NewRelicReporting extends Actor {
- def receive = {
- case trace: UowTrace => recordTransaction(trace)
- }
-
- def recordTransaction(uowTrace: UowTrace): Unit = {
- val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9)
-
- NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat )
- NewRelic.recordMetric("WebTransaction", time.toFloat)
- NewRelic.recordMetric("HttpDispatcher", time.toFloat)
-
- uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace =>
- val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat
-
- println("Web External: " + webExternalTrace)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
- }
-
-
- val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp)
-
-
- def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
- case Nil => accum
- case head :: tail =>
- if(head.start > lastEnd)
- measureExternal(accum + (head.finish-head.start), head.finish, tail)
- else
- measureExternal(accum + (head.finish-lastEnd), head.finish, tail)
- }
-
- val external = measureExternal(0, 0, allExternals) / 1E9
-
-
- NewRelic.recordMetric(s"External/all", external.toFloat)
- NewRelic.recordMetric(s"External/allWeb", external.toFloat)
-
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
deleted file mode 100644
index 392f53b8..00000000
--- a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package kamon.trace
-
-import spray.routing.directives.BasicDirectives
-import spray.routing._
-import kamon.Tracer
-import java.util.concurrent.atomic.AtomicLong
-import scala.util.Try
-import java.net.InetAddress
-
-trait UowDirectives extends BasicDirectives {
- def uow: Directive0 = mapRequest { request =>
- val uowHeader = request.headers.find(_.name == "X-UOW")
-
- val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow)
- Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow))
-
- request
- }
-}
-
-object UowDirectives {
- val uowCounter = new AtomicLong
-
- val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())
-
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
deleted file mode 100644
index c8d0d4f0..00000000
--- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package kamon.trace.context
-
-import kamon.trace.TraceContext
-
-trait TracingAwareContext {
- def traceContext: Option[TraceContext]
- def timestamp: Long
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
deleted file mode 100644
index da3c6c6a..00000000
--- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package test
-
-import akka.actor._
-import kamon.Tracer
-import spray.routing.SimpleRoutingApp
-import akka.util.Timeout
-import spray.httpx.RequestBuilding
-import scala.concurrent.{Await, Future}
-import kamon.trace.UowDirectives
-
-object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives {
- import scala.concurrent.duration._
- import spray.client.pipelining._
- import akka.pattern.ask
-
- implicit val system = ActorSystem("test")
- import system.dispatcher
-
- val act = system.actorOf(Props(new Actor {
- def receive: Actor.Receive = { case any => sender ! any }
- }), "com.despegar-2:[]s-w@&,*")
-
- implicit val timeout = Timeout(30 seconds)
-
- val pipeline = sendReceive
- val replier = system.actorOf(Props[Replier])
-
- startServer(interface = "localhost", port = 9090) {
- get {
- path("test"){
- uow {
- complete {
- val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil
-
- Future.sequence(futures).map(l => "Ok")
- }
- }
- } ~
- path("reply" / Segment) { reqID =>
- uow {
- complete {
- if (Tracer.context().isEmpty)
- println("ROUTE NO CONTEXT")
-
- (replier ? reqID).mapTo[String]
- }
- }
- } ~
- path("ok") {
- complete("ok")
- } ~
- path("future") {
- dynamic {
- complete(Future { "OK" })
- }
- } ~
- path("error") {
- complete {
- throw new NullPointerException
- "okk"
- }
- }
- }
- }
-
-}
-
-object Verifier extends App {
-
- def go: Unit = {
- import scala.concurrent.duration._
- import spray.client.pipelining._
-
- implicit val system = ActorSystem("test")
- import system.dispatcher
-
- implicit val timeout = Timeout(30 seconds)
-
- val pipeline = sendReceive
-
- val futures = Future.sequence(for(i <- 1 to 500) yield {
- pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString)
- })
- println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true))
- }
-
-
-
-
-}
-
-class Replier extends Actor with ActorLogging {
- def receive = {
- case anything =>
- if(Tracer.context.isEmpty)
- log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
-
- log.info("Processing at the Replier")
- sender ! anything
- }
-}
diff --git a/kamon-core/src/test/scala/ExtraSpec.scala b/kamon-core/src/test/scala/ExtraSpec.scala
deleted file mode 100644
index b8dc053d..00000000
--- a/kamon-core/src/test/scala/ExtraSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-import akka.actor.ActorSystem
-import akka.testkit.TestKit
-import org.scalatest.WordSpecLike
-import shapeless._
-
-class ExtraSpec extends TestKit(ActorSystem("ExtraSpec")) with WordSpecLike {
-
- "the Extra pattern helper" should {
- "be constructed from a finite number of types" in {
- Extra.expecting[String :: Int :: HNil].as[Person]
- }
- }
-
- case class Person(name: String, age: Int)
-}
-
-/**
- * Desired Features:
- * 1. Expect messages of different types, apply a function and forward to some other.
- */
-
-object Extra {
- def expecting[T <: HList] = new Object {
- def as[U <: Product] = ???
- }
-}
-
-/*
-extra of {
- expect[A] in { actor ! msg}
- expect[A] in { actor ! msg}
-} as (A, A) pipeTo (z)*/
-
-
diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
deleted file mode 100644
index cdfa2813..00000000
--- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-package akka.instrumentation
-
-import org.scalatest.{WordSpecLike, Matchers}
-import akka.actor.{ActorRef, Actor, Props, ActorSystem}
-
-import akka.testkit.{ImplicitSender, TestKit}
-import kamon.{Tracer}
-import akka.pattern.{pipe, ask}
-import akka.util.Timeout
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
-import akka.routing.RoundRobinRouter
-import kamon.trace.TraceContext
-
-
-class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender {
- implicit val executionContext = system.dispatcher
-
- "an instrumented actor ref" when {
- "used inside the context of a transaction" should {
- "propagate the trace context using bang" in new TraceContextEchoFixture {
- echo ! "test"
-
- expectMsg(Some(testTraceContext))
- }
-
- "propagate the trace context using tell" in new TraceContextEchoFixture {
- echo.tell("test", testActor)
-
- expectMsg(Some(testTraceContext))
- }
-
- "propagate the trace context using ask" in new TraceContextEchoFixture {
- implicit val timeout = Timeout(1 seconds)
- (echo ? "test") pipeTo(testActor)
-
- expectMsg(Some(testTraceContext))
- }
-
- "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture {
- val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test"))
-
- expectMsgAllOf(contexts: _*)
- }
-
- /*"propagate with many asks" in {
- val echo = system.actorOf(Props[TraceContextEcho])
- val iterations = 50000
- implicit val timeout = Timeout(10 seconds)
-
- val futures = for(_ <- 1 to iterations) yield {
- Tracer.start
- val result = (echo ? "test")
- Tracer.clear
-
- result
- }
-
- val allResults = Await.result(Future.sequence(futures), 10 seconds)
- assert(iterations == allResults.collect {
- case Some(_) => 1
- }.sum)
- }*/
- }
- }
-
- trait TraceContextEchoFixture {
- val testTraceContext = Tracer.newTraceContext()
- val echo = system.actorOf(Props[TraceContextEcho])
-
- Tracer.set(testTraceContext)
- }
-
- trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
- override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10)))
-
- def tellWithNewContext(target: ActorRef, message: Any): TraceContext = {
- val context = Tracer.newTraceContext()
- Tracer.set(context)
-
- target ! message
- context
- }
- }
-
-}
-
-class TraceContextEcho extends Actor {
- def receive = {
- case msg: String ⇒ sender ! Tracer.context()
- }
-}
-
-
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
deleted file mode 100644
index 1eab6355..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package kamon.instrumentation
-
-import org.scalatest.{Matchers, WordSpec}
-import akka.actor.ActorSystem
-import kamon.Kamon
-
-class ActorSystemInstrumentationSpec extends WordSpec with Matchers {
-
- // TODO: Selection filters to exclude unwanted actor systems. Read from configuration.
-
- "the actor system instrumentation" should {
- "register all actor systems created" in {
- val as1 = ActorSystem("as1")
- val as2 = ActorSystem("as2")
-
-
- Kamon.Metric.actorSystem("as1") should not be (None)
- Kamon.Metric.actorSystem("as2") should not be (None)
- Kamon.Metric.actorSystem("unknown") should be (None)
- }
- }
-}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
deleted file mode 100644
index 89ef61f3..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-package kamon.instrumentation
-
-import org.scalatest.{Matchers, WordSpec}
-import akka.actor.{Actor, Props, ActorSystem}
-import kamon.metric.MetricDirectory
-import kamon.Kamon
-
-class DispatcherInstrumentationSpec extends WordSpec with Matchers{
-
-
- "the dispatcher instrumentation" should {
- "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem {
- val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers
- (1 to 10).foreach(actor ! _)
-
- val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot
- println("Active max: "+active.max)
- println("Active min: "+active.min)
-
- }
- }
-
-
- trait SingleDispatcherActorSystem {
- val actorSystem = ActorSystem("single-dispatcher")
- val actor = actorSystem.actorOf(Props(new Actor {
- def receive = {
- case a => sender ! a;
- }
- }))
-
- }
-}
-
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
deleted file mode 100644
index cc55ec92..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package kamon.instrumentation
-
-import org.scalatest.WordSpec
-import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import java.util.concurrent.ConcurrentLinkedQueue
-import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope}
-import java.util.Queue
-import akka.actor.{ActorSystem, Actor}
-
-class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec {
- def this() = this(ActorSystem("MessageQueueInstrumentationSpec"))
-
-
- /*"A MonitoredMessageQueue" should {
- "update the related histogram when a message is enqueued" in {
- new PopulatedMessageQueueFixture {
-
- assert(histogram.getSnapshot.getMax === 0)
-
- for(i <- 1 to 3) { enqueueDummyMessage }
-
- assert(histogram.getCount === 3)
- assert(histogram.getSnapshot.getMax === 3)
- assert(histogram.getSnapshot.getMin === 1)
- }
- }
-
- "update the related histogram when a message is dequeued" in {
- new PopulatedMessageQueueFixture {
- for(i <- 1 to 3) { enqueueDummyMessage }
- assert(histogram.getSnapshot.getMax === 3)
-
- messageQueue.dequeue()
- messageQueue.dequeue()
-
- assert(histogram.getCount === 5)
- assert(histogram.getSnapshot.getMax === 3)
- assert(histogram.getSnapshot.getMin === 1)
- }
- }
- }
-
- trait PopulatedMessageQueueFixture {
-
- val histogram = new Histogram(new ExponentiallyDecayingReservoir())
-/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
- final def queue: Queue[Envelope] = this
- }*/
- val messageQueue = new MonitoredMessageQueue(delegate, histogram)
-
- def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem))
- }*/
-}
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
deleted file mode 100644
index 570f64dd..00000000
--- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package kamon.instrumentation
-
-import scala.concurrent.{Await, Promise, Future}
-import org.scalatest.{Matchers, OptionValues, WordSpec}
-import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration}
-import kamon.{Tracer, Kamon}
-import java.util.UUID
-import scala.util.Success
-import scala.concurrent.duration._
-import java.util.concurrent.TimeUnit
-import akka.actor.ActorSystem
-import kamon.trace.TraceContext
-
-
-class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
-
- "a instrumented runnable" when {
- "created in a thread that does have a TraceContext" must {
- "preserve the TraceContext" which {
- "should be available during the run method execution" in new FutureWithContextFixture {
-
- whenReady(futureWithContext) { result =>
- result.value should equal(testContext)
- }
- }
-
- "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture {
-
- val onCompleteContext = Promise[Option[TraceContext]]()
-
- Tracer.traceContext.withValue(Some(testContext)) {
- futureWithContext.onComplete({
- case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value))
- })
- }
-
- whenReady(onCompleteContext.future) { result =>
- result should equal(Some(testContext))
- }
- }
- }
- }
-
- "created in a thread that doest have a TraceContext" must {
- "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{
- whenReady(futureWithoutContext) { result =>
- result should equal(None)
- }
- }
-
- "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture {
- val onCompleteContext = Promise[Option[TraceContext]]()
-
- futureWithoutContext.onComplete {
- case _ => onCompleteContext.complete(Success(Tracer.traceContext.value))
- }
-
- whenReady(onCompleteContext.future) { result =>
- result should equal(None)
- }
- }
- }
- }
-
-
- /**
- * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have.
- */
- implicit val testActorSystem = ActorSystem("test-actorsystem")
- implicit val execContext = testActorSystem.dispatcher
-
- class FutureWithContextFixture {
- val testContext = TraceContext()
-
- var futureWithContext: Future[Option[TraceContext]] = _
- Tracer.traceContext.withValue(Some(testContext)) {
- futureWithContext = Future { Tracer.traceContext.value }
- }
- }
-
- trait FutureWithoutContextFixture {
- val futureWithoutContext = Future { Tracer.traceContext.value }
- }
-}
-
-
diff --git a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala b/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala
deleted file mode 100644
index 60b5f06d..00000000
--- a/kamon-core/src/test/scala/kamon/trace/TraceAggregatorSpec.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package kamon.trace
-
-import org.scalatest.{WordSpecLike, WordSpec}
-import akka.testkit.{TestKitBase, TestKit}
-import akka.actor.ActorSystem
-import scala.concurrent.duration._
-import kamon.trace.UowTracing.{Finish, Rename, Start}
-
-class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike {
-
- "a TraceAggregator" should {
- "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture {
- within(1 second) {
- aggregator ! Start()
- aggregator ! Finish()
-
- expectMsg(UowTrace("UNKNOWN", Seq(Start(), Finish())))
- }
- }
-
- "change the uow name after receiving a Rename message" in new AggregatorFixture {
- within(1 second) {
- aggregator ! Start()
- aggregator ! Rename("test-uow")
- aggregator ! Finish()
-
- expectMsg(UowTrace("test-uow", Seq(Start(), Finish())))
- }
- }
- }
-
-
- trait AggregatorFixture {
- val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds))
- }
-}