diff options
Diffstat (limited to 'kamon-core/src/main/scala')
12 files changed, 17 insertions, 398 deletions
diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala new file mode 100644 index 00000000..3d503d54 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala @@ -0,0 +1,14 @@ +package kamon + +import akka.actor.{ActorSystem, ExtensionId} +import java.util.concurrent.ConcurrentHashMap + +object AkkaExtensionSwap { + def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = { + val extensionsField = system.getClass.getDeclaredField("extensions") + extensionsField.setAccessible(true) + + val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]] + extensions.put(key, value) + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 8c934f60..5d0d77a3 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,16 +1,6 @@ package kamon import akka.actor._ -import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} -import scala.concurrent.duration.FiniteDuration -import scala.collection.concurrent.TrieMap -import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} -import kamon.metric.ActorSystemMetrics - - -object Instrument { - val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation -} object Kamon { trait Extension extends akka.actor.Extension { @@ -18,8 +8,5 @@ object Kamon { } def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager - - - implicit lazy val actorSystem = ActorSystem("kamon") } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala deleted file mode 100644 index 4e078201..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala +++ /dev/null @@ -1,42 +0,0 @@ -package kamon.instrumentation - -import akka.actor.{Props, ActorSystem, ActorRef} -import akka.dispatch.{MessageDispatcher, Envelope} -import kamon.{Tracer} -import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage -import kamon.trace.TraceContext - -trait ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation -} - - -trait ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) -} - -object ActorReceiveInvokeInstrumentation { - val noopPreReceive = new ActorReceiveInvokeInstrumentation{ - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None) - } -} - -class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration { - def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context) - - def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = { - new ActorReceiveInvokeInstrumentation { - def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match { - case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx) - case anyOther => (anyOther, None) - } - } - } -} - -object SimpleContextPassingInstrumentation { - case class SimpleTraceMessage(message: Any, context: Option[TraceContext]) -} - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala deleted file mode 100644 index 9b53bd5d..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala +++ /dev/null @@ -1,32 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import kamon.{Tracer} -import org.aspectj.lang.ProceedingJoinPoint -import org.slf4j.MDC -import kamon.trace.TraceContext - -@Aspect -class ActorLoggingInstrumentation { - - - @DeclareMixin("akka.event.Logging.LogEvent+") - def traceContextMixin: ContextAware = new ContextAware { - def traceContext: Option[TraceContext] = Tracer.context() - } - - @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} - - @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { - logEvent.traceContext match { - case Some(ctx) => - MDC.put("uow", ctx.uow) - pjp.proceed() - MDC.remove("uow") - - case None => pjp.proceed() - } - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index 9b5ce0a4..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,50 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer} -import akka.dispatch.{Envelope, MessageDispatcher} -import com.codahale.metrics.Timer -import scala.Some -import kamon.trace.context.TracingAwareContext -import kamon.trace.TraceContext - -case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) -case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext - -@Aspect -class ActorCellInvokeInstrumentation { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - //safe cast - val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext - - Tracer.traceContext.withValue(msgContext) { - pjp.proceed() - } - } -} - -@Aspect -class EnvelopeTracingContext { - - @DeclareMixin("akka.dispatch.Envelope") - def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext() - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def requestRecordInit(ctx: TracingAwareContext): Unit = {} - - @After("requestRecordInit(ctx)") - def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = { - // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - ctx.traceContext - } -} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index b4f8a475..3a091775 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -21,7 +21,7 @@ class ActorSystemInstrumentation { @After("actorSystemInstantiation(name, applicationConfig, classLoader)") def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { - Kamon.Metric.registerActorSystem(name) + //Kamon.Metric.registerActorSystem(name) } } @@ -35,7 +35,7 @@ class ForkJoinPoolInstrumentation { @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { - val (actorSystemName, dispatcherName) = threadFactory match { + /*val (actorSystemName, dispatcherName) = threadFactory match { case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) case _ => ("Unknown", "Unknown") } @@ -45,7 +45,7 @@ class ForkJoinPoolInstrumentation { activeThreadsHistogram = m.activeThreadCount poolSizeHistogram = m.poolSize println(s"Registered $dispatcherName for actor system $actorSystemName") - } + }*/ } def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala deleted file mode 100644 index 70f3e54a..00000000 --- a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics -import metrics._ -import java.util.concurrent.TimeUnit -import java.util -import com.newrelic.api.agent.NewRelic -import scala.collection.JavaConverters._ - - -class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { - - - - private[NewRelicReporter] def processMeter(name: String, meter: Meter) { - NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) - } - - private[NewRelicReporter] def processCounter(name:String, counter:Counter) { - println(s"Logging to NewRelic: ${counter.getCount}") - - } - - -/* def processGauge(name: String, gauge: Gauge[_]) = { - println(s"the value is: "+gauge.getValue) - NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) - }*/ - - - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { - //Process Meters - meters.asScala.map{case(name, meter) => processMeter(name, meter)} - - //Process Meters - counters.asScala.map{case(name, counter) => processCounter(name, counter)} - - // Gauges - gauges.asScala.foreach{ case (name, gauge) => { - val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() - val fullMetricName = "Custom" + name - NewRelic.recordMetric(fullMetricName, measure) - }} - } - - -} - -object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala deleted file mode 100644 index 4bc49496..00000000 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ /dev/null @@ -1,18 +0,0 @@ -package kamon.newrelic - -import akka.actor.Actor -import akka.event.Logging.Error -import akka.event.Logging.{LoggerInitialized, InitializeLogger} -import com.newrelic.api.agent.NewRelic -import NewRelic.noticeError - -class NewRelicErrorLogger extends Actor { - def receive = { - case InitializeLogger(_) => sender ! LoggerInitialized - case error @ Error(cause, logSource, logClass, message) => notifyError(error) - } - - def notifyError(error: Error): Unit = { - noticeError(error.cause) - } -} diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala deleted file mode 100644 index 106f27e2..00000000 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ /dev/null @@ -1,52 +0,0 @@ -package kamon.newrelic - -import akka.actor.Actor -import kamon.trace.UowTrace -import com.newrelic.api.agent.{Response, Request, Trace, NewRelic} -import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart} -import java.util -import java.util.Date - - -class NewRelicReporting extends Actor { - def receive = { - case trace: UowTrace => recordTransaction(trace) - } - - def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) - - NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) - NewRelic.recordMetric("WebTransaction", time.toFloat) - NewRelic.recordMetric("HttpDispatcher", time.toFloat) - - uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => - val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat - - println("Web External: " + webExternalTrace) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external) - NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) - } - - - val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) - - - def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil => accum - case head :: tail => - if(head.start > lastEnd) - measureExternal(accum + (head.finish-head.start), head.finish, tail) - else - measureExternal(accum + (head.finish-lastEnd), head.finish, tail) - } - - val external = measureExternal(0, 0, allExternals) / 1E9 - - - NewRelic.recordMetric(s"External/all", external.toFloat) - NewRelic.recordMetric(s"External/allWeb", external.toFloat) - - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala deleted file mode 100644 index 392f53b8..00000000 --- a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala +++ /dev/null @@ -1,28 +0,0 @@ -package kamon.trace - -import spray.routing.directives.BasicDirectives -import spray.routing._ -import kamon.Tracer -import java.util.concurrent.atomic.AtomicLong -import scala.util.Try -import java.net.InetAddress - -trait UowDirectives extends BasicDirectives { - def uow: Directive0 = mapRequest { request => - val uowHeader = request.headers.find(_.name == "X-UOW") - - val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) - Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow)) - - request - } -} - -object UowDirectives { - val uowCounter = new AtomicLong - - val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) - -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala deleted file mode 100644 index c8d0d4f0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala +++ /dev/null @@ -1,8 +0,0 @@ -package kamon.trace.context - -import kamon.trace.TraceContext - -trait TracingAwareContext { - def traceContext: Option[TraceContext] - def timestamp: Long -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala deleted file mode 100644 index da3c6c6a..00000000 --- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala +++ /dev/null @@ -1,101 +0,0 @@ -package test - -import akka.actor._ -import kamon.Tracer -import spray.routing.SimpleRoutingApp -import akka.util.Timeout -import spray.httpx.RequestBuilding -import scala.concurrent.{Await, Future} -import kamon.trace.UowDirectives - -object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives { - import scala.concurrent.duration._ - import spray.client.pipelining._ - import akka.pattern.ask - - implicit val system = ActorSystem("test") - import system.dispatcher - - val act = system.actorOf(Props(new Actor { - def receive: Actor.Receive = { case any => sender ! any } - }), "com.despegar-2:[]s-w@&,*") - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - val replier = system.actorOf(Props[Replier]) - - startServer(interface = "localhost", port = 9090) { - get { - path("test"){ - uow { - complete { - val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil - - Future.sequence(futures).map(l => "Ok") - } - } - } ~ - path("reply" / Segment) { reqID => - uow { - complete { - if (Tracer.context().isEmpty) - println("ROUTE NO CONTEXT") - - (replier ? reqID).mapTo[String] - } - } - } ~ - path("ok") { - complete("ok") - } ~ - path("future") { - dynamic { - complete(Future { "OK" }) - } - } ~ - path("error") { - complete { - throw new NullPointerException - "okk" - } - } - } - } - -} - -object Verifier extends App { - - def go: Unit = { - import scala.concurrent.duration._ - import spray.client.pipelining._ - - implicit val system = ActorSystem("test") - import system.dispatcher - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - - val futures = Future.sequence(for(i <- 1 to 500) yield { - pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) - }) - println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) - } - - - - -} - -class Replier extends Actor with ActorLogging { - def receive = { - case anything => - if(Tracer.context.isEmpty) - log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") - - log.info("Processing at the Replier") - sender ! anything - } -} |