aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r--kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala42
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala32
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala50
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala52
-rw-r--r--kamon-core/src/main/scala/kamon/trace/UowDirectives.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala8
-rw-r--r--kamon-core/src/main/scala/test/SimpleRequestProcessor.scala101
12 files changed, 17 insertions, 398 deletions
diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
new file mode 100644
index 00000000..3d503d54
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala
@@ -0,0 +1,14 @@
+package kamon
+
+import akka.actor.{ActorSystem, ExtensionId}
+import java.util.concurrent.ConcurrentHashMap
+
+object AkkaExtensionSwap {
+ def swap(system: ActorSystem, key: ExtensionId[_], value: Kamon.Extension): Unit = {
+ val extensionsField = system.getClass.getDeclaredField("extensions")
+ extensionsField.setAccessible(true)
+
+ val extensions = extensionsField.get(system).asInstanceOf[ConcurrentHashMap[ExtensionId[_], AnyRef]]
+ extensions.put(key, value)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 8c934f60..5d0d77a3 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,16 +1,6 @@
package kamon
import akka.actor._
-import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
-import scala.concurrent.duration.FiniteDuration
-import scala.collection.concurrent.TrieMap
-import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration}
-import kamon.metric.ActorSystemMetrics
-
-
-object Instrument {
- val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation
-}
object Kamon {
trait Extension extends akka.actor.Extension {
@@ -18,8 +8,5 @@ object Kamon {
}
def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager
-
-
- implicit lazy val actorSystem = ActorSystem("kamon")
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
deleted file mode 100644
index 4e078201..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package kamon.instrumentation
-
-import akka.actor.{Props, ActorSystem, ActorRef}
-import akka.dispatch.{MessageDispatcher, Envelope}
-import kamon.{Tracer}
-import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
-import kamon.trace.TraceContext
-
-trait ActorInstrumentationConfiguration {
- def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any
- def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation
-}
-
-
-trait ActorReceiveInvokeInstrumentation {
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext])
-}
-
-object ActorReceiveInvokeInstrumentation {
- val noopPreReceive = new ActorReceiveInvokeInstrumentation{
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None)
- }
-}
-
-class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration {
- def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context)
-
- def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = {
- new ActorReceiveInvokeInstrumentation {
- def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match {
- case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx)
- case anyOther => (anyOther, None)
- }
- }
- }
-}
-
-object SimpleContextPassingInstrumentation {
- case class SimpleTraceMessage(message: Any, context: Option[TraceContext])
-}
-
-
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
deleted file mode 100644
index 9b53bd5d..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
-import kamon.{Tracer}
-import org.aspectj.lang.ProceedingJoinPoint
-import org.slf4j.MDC
-import kamon.trace.TraceContext
-
-@Aspect
-class ActorLoggingInstrumentation {
-
-
- @DeclareMixin("akka.event.Logging.LogEvent+")
- def traceContextMixin: ContextAware = new ContextAware {
- def traceContext: Option[TraceContext] = Tracer.context()
- }
-
- @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
- def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {}
-
- @Around("withMdcInvocation(logSource, logEvent, logStatement)")
- def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {
- logEvent.traceContext match {
- case Some(ctx) =>
- MDC.put("uow", ctx.uow)
- pjp.proceed()
- MDC.remove("uow")
-
- case None => pjp.proceed()
- }
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
deleted file mode 100644
index 9b5ce0a4..00000000
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Tracer}
-import akka.dispatch.{Envelope, MessageDispatcher}
-import com.codahale.metrics.Timer
-import scala.Some
-import kamon.trace.context.TracingAwareContext
-import kamon.trace.TraceContext
-
-case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
-case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext
-
-@Aspect
-class ActorCellInvokeInstrumentation {
-
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
- def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
-
- @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
- def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
- @Around("invokingActorBehaviourAtActorCell(envelope)")
- def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
- //safe cast
- val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext
-
- Tracer.traceContext.withValue(msgContext) {
- pjp.proceed()
- }
- }
-}
-
-@Aspect
-class EnvelopeTracingContext {
-
- @DeclareMixin("akka.dispatch.Envelope")
- def mixin: TracingAwareContext = DefaultTracingAwareEnvelopeContext()
-
- @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def requestRecordInit(ctx: TracingAwareContext): Unit = {}
-
- @After("requestRecordInit(ctx)")
- def whenCreatedRequestRecord(ctx: TracingAwareContext): Unit = {
- // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
- ctx.traceContext
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
index b4f8a475..3a091775 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -21,7 +21,7 @@ class ActorSystemInstrumentation {
@After("actorSystemInstantiation(name, applicationConfig, classLoader)")
def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
- Kamon.Metric.registerActorSystem(name)
+ //Kamon.Metric.registerActorSystem(name)
}
}
@@ -35,7 +35,7 @@ class ForkJoinPoolInstrumentation {
@After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
- val (actorSystemName, dispatcherName) = threadFactory match {
+ /*val (actorSystemName, dispatcherName) = threadFactory match {
case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
case _ => ("Unknown", "Unknown")
}
@@ -45,7 +45,7 @@ class ForkJoinPoolInstrumentation {
activeThreadsHistogram = m.activeThreadCount
poolSizeHistogram = m.poolSize
println(s"Registered $dispatcherName for actor system $actorSystemName")
- }
+ }*/
}
def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
deleted file mode 100644
index 70f3e54a..00000000
--- a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-package kamon.metric
-
-import com.codahale.metrics
-import metrics._
-import java.util.concurrent.TimeUnit
-import java.util
-import com.newrelic.api.agent.NewRelic
-import scala.collection.JavaConverters._
-
-
-class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
-
-
-
- private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
- NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
- }
-
- private[NewRelicReporter] def processCounter(name:String, counter:Counter) {
- println(s"Logging to NewRelic: ${counter.getCount}")
-
- }
-
-
-/* def processGauge(name: String, gauge: Gauge[_]) = {
- println(s"the value is: "+gauge.getValue)
- NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
- }*/
-
-
- def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
- //Process Meters
- meters.asScala.map{case(name, meter) => processMeter(name, meter)}
-
- //Process Meters
- counters.asScala.map{case(name, counter) => processCounter(name, counter)}
-
- // Gauges
- gauges.asScala.foreach{ case (name, gauge) => {
- val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
- val fullMetricName = "Custom" + name
- NewRelic.recordMetric(fullMetricName, measure)
- }}
- }
-
-
-}
-
-object NewRelicReporter {
- def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
deleted file mode 100644
index 4bc49496..00000000
--- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.Actor
-import akka.event.Logging.Error
-import akka.event.Logging.{LoggerInitialized, InitializeLogger}
-import com.newrelic.api.agent.NewRelic
-import NewRelic.noticeError
-
-class NewRelicErrorLogger extends Actor {
- def receive = {
- case InitializeLogger(_) => sender ! LoggerInitialized
- case error @ Error(cause, logSource, logClass, message) => notifyError(error)
- }
-
- def notifyError(error: Error): Unit = {
- noticeError(error.cause)
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
deleted file mode 100644
index 106f27e2..00000000
--- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicReporting.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-package kamon.newrelic
-
-import akka.actor.Actor
-import kamon.trace.UowTrace
-import com.newrelic.api.agent.{Response, Request, Trace, NewRelic}
-import kamon.trace.UowTracing.{WebExternal, WebExternalFinish, WebExternalStart}
-import java.util
-import java.util.Date
-
-
-class NewRelicReporting extends Actor {
- def receive = {
- case trace: UowTrace => recordTransaction(trace)
- }
-
- def recordTransaction(uowTrace: UowTrace): Unit = {
- val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9)
-
- NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat )
- NewRelic.recordMetric("WebTransaction", time.toFloat)
- NewRelic.recordMetric("HttpDispatcher", time.toFloat)
-
- uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace =>
- val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat
-
- println("Web External: " + webExternalTrace)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/all", external)
- NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external)
- }
-
-
- val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp)
-
-
- def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match {
- case Nil => accum
- case head :: tail =>
- if(head.start > lastEnd)
- measureExternal(accum + (head.finish-head.start), head.finish, tail)
- else
- measureExternal(accum + (head.finish-lastEnd), head.finish, tail)
- }
-
- val external = measureExternal(0, 0, allExternals) / 1E9
-
-
- NewRelic.recordMetric(s"External/all", external.toFloat)
- NewRelic.recordMetric(s"External/allWeb", external.toFloat)
-
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala b/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
deleted file mode 100644
index 392f53b8..00000000
--- a/kamon-core/src/main/scala/kamon/trace/UowDirectives.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package kamon.trace
-
-import spray.routing.directives.BasicDirectives
-import spray.routing._
-import kamon.Tracer
-import java.util.concurrent.atomic.AtomicLong
-import scala.util.Try
-import java.net.InetAddress
-
-trait UowDirectives extends BasicDirectives {
- def uow: Directive0 = mapRequest { request =>
- val uowHeader = request.headers.find(_.name == "X-UOW")
-
- val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow)
- Tracer.set(Tracer.context().getOrElse(Tracer.newTraceContext()).copy(uow = generatedUow))
-
- request
- }
-}
-
-object UowDirectives {
- val uowCounter = new AtomicLong
-
- val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost")
-
- def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet())
-
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
deleted file mode 100644
index c8d0d4f0..00000000
--- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package kamon.trace.context
-
-import kamon.trace.TraceContext
-
-trait TracingAwareContext {
- def traceContext: Option[TraceContext]
- def timestamp: Long
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
deleted file mode 100644
index da3c6c6a..00000000
--- a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package test
-
-import akka.actor._
-import kamon.Tracer
-import spray.routing.SimpleRoutingApp
-import akka.util.Timeout
-import spray.httpx.RequestBuilding
-import scala.concurrent.{Await, Future}
-import kamon.trace.UowDirectives
-
-object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives {
- import scala.concurrent.duration._
- import spray.client.pipelining._
- import akka.pattern.ask
-
- implicit val system = ActorSystem("test")
- import system.dispatcher
-
- val act = system.actorOf(Props(new Actor {
- def receive: Actor.Receive = { case any => sender ! any }
- }), "com.despegar-2:[]s-w@&,*")
-
- implicit val timeout = Timeout(30 seconds)
-
- val pipeline = sendReceive
- val replier = system.actorOf(Props[Replier])
-
- startServer(interface = "localhost", port = 9090) {
- get {
- path("test"){
- uow {
- complete {
- val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil
-
- Future.sequence(futures).map(l => "Ok")
- }
- }
- } ~
- path("reply" / Segment) { reqID =>
- uow {
- complete {
- if (Tracer.context().isEmpty)
- println("ROUTE NO CONTEXT")
-
- (replier ? reqID).mapTo[String]
- }
- }
- } ~
- path("ok") {
- complete("ok")
- } ~
- path("future") {
- dynamic {
- complete(Future { "OK" })
- }
- } ~
- path("error") {
- complete {
- throw new NullPointerException
- "okk"
- }
- }
- }
- }
-
-}
-
-object Verifier extends App {
-
- def go: Unit = {
- import scala.concurrent.duration._
- import spray.client.pipelining._
-
- implicit val system = ActorSystem("test")
- import system.dispatcher
-
- implicit val timeout = Timeout(30 seconds)
-
- val pipeline = sendReceive
-
- val futures = Future.sequence(for(i <- 1 to 500) yield {
- pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString)
- })
- println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true))
- }
-
-
-
-
-}
-
-class Replier extends Actor with ActorLogging {
- def receive = {
- case anything =>
- if(Tracer.context.isEmpty)
- log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
-
- log.info("Processing at the Replier")
- sender ! anything
- }
-}