From 227c2dfe6cb8b7e175ad72285dfdfbd15672be24 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 30 Oct 2013 09:04:25 -0300 Subject: move things all over the place --- kamon-core/src/main/scala/kamon/Kamon.scala | 56 +++------- kamon-core/src/main/scala/kamon/TraceContext.scala | 64 ------------ .../instrumentation/ActorInstrumentation.scala | 3 +- .../ActorLoggingInstrumentation.scala | 3 +- .../ActorRefTellInstrumentation.scala | 3 +- .../instrumentation/RunnableInstrumentation.scala | 3 +- .../SprayServerInstrumentation.scala | 115 --------------------- .../kamon/trace/context/TracingAwareContext.scala | 2 +- .../instrumentation/ActorInstrumentationSpec.scala | 3 +- .../RunnableInstrumentationSpec.scala | 3 +- 10 files changed, 28 insertions(+), 227 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/TraceContext.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index fb1b2393..75ef1efe 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,12 +1,11 @@ package kamon -import akka.actor.{Actor, Props, ActorSystem} +import akka.actor._ import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} import scala.concurrent.duration.FiniteDuration -import com.newrelic.api.agent.NewRelic import scala.collection.concurrent.TrieMap import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} -import scala.util.DynamicVariable +import kamon.metric.ActorSystemMetrics object Instrument { @@ -14,6 +13,19 @@ object Instrument { } object Kamon { + trait Extension extends akka.actor.Extension { + def manager: ActorRef + } + + def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager + + + + + + + + implicit lazy val actorSystem = ActorSystem("kamon") object Metric { @@ -32,18 +44,6 @@ object Kamon { } -object Tracer { - val traceContext = new DynamicVariable[Option[TraceContext]](None) - - - def context() = traceContext.value - def set(ctx: TraceContext) = traceContext.value = Some(ctx) - - def start = set(newTraceContext) - def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) -} - - class MetricManager extends Actor { implicit val ec = context.system.dispatcher @@ -70,29 +70,3 @@ class MetricManager extends Actor { case class RegisterForAllDispatchers(frequency: FiniteDuration) case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) - - - - - - -class NewrelicReporterActor extends Actor { - import scala.concurrent.duration._ - - //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) - - def receive = { - case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { - /*println("PUBLISHED DISPATCHER STATS") - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) - println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/ - - - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) - - NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) - } - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala deleted file mode 100644 index 63cdb488..00000000 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ /dev/null @@ -1,64 +0,0 @@ -package kamon - -import java.util.UUID -import akka.actor._ -import java.util.concurrent.atomic.AtomicLong -import kamon.trace.UowTraceAggregator -import scala.concurrent.duration._ -import kamon.newrelic.NewRelicReporting -import kamon.trace.UowTracing.Start - -// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) - -object TraceContext { - val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) - val traceIdCounter = new AtomicLong - - def apply()(implicit system: ActorSystem) = { - val n = traceIdCounter.incrementAndGet() - val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}") - actor ! Start() - - new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer - } -} - - - -class TraceAccumulator extends Actor { - def receive = { - case a => println("Trace Accumulated: "+a) - } -} - - -trait TraceEntry -case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry -case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - -object Collector - -trait TraceEntryStorage { - def store(entry: TraceEntry): Boolean -} - -class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { - def store(entry: TraceEntry) = storage.store(entry) -} - -object ThreadLocalTraceEntryStorage extends TraceEntryStorage { - - private val storage = new ThreadLocal[List[TraceEntry]] { - override def initialValue(): List[TraceEntry] = Nil - } - - def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) - - def store(entry: TraceEntry): Boolean = { - update(entry :: _) - true - } -} - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala index 4e47c2a4..4e078201 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala @@ -2,8 +2,9 @@ package kamon.instrumentation import akka.actor.{Props, ActorSystem, ActorRef} import akka.dispatch.{MessageDispatcher, Envelope} -import kamon.{Tracer, TraceContext} +import kamon.{Tracer} import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage +import kamon.trace.TraceContext trait ActorInstrumentationConfiguration { def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala index 47d1756f..9b53bd5d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingInstrumentation.scala @@ -1,9 +1,10 @@ package kamon.instrumentation import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import kamon.{Tracer, TraceContext} +import kamon.{Tracer} import org.aspectj.lang.ProceedingJoinPoint import org.slf4j.MDC +import kamon.trace.TraceContext @Aspect class ActorLoggingInstrumentation { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 7b5d5339..9b5ce0a4 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,11 +3,12 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer, TraceContext} +import kamon.{Tracer} import akka.dispatch.{Envelope, MessageDispatcher} import com.codahale.metrics.Timer import scala.Some import kamon.trace.context.TracingAwareContext +import kamon.trace.TraceContext case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index 992cfa82..2be6e5d1 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -1,9 +1,10 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ -import kamon.{Tracer, TraceContext} +import kamon.{Tracer} import org.aspectj.lang.ProceedingJoinPoint import scala.Some +import kamon.trace.TraceContext /** * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala deleted file mode 100644 index 2239f382..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ /dev/null @@ -1,115 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import kamon.{TraceContext, Tracer} -import kamon.trace.UowTracing._ -import kamon.trace.context.TracingAwareContext -import org.aspectj.lang.ProceedingJoinPoint -import spray.http.HttpRequest -import kamon.trace.UowTracing.Finish -import kamon.trace.UowTracing.Rename -import spray.http.HttpHeaders.Host - -//import spray.can.client.HttpHostConnector.RequestContext - -trait ContextAware { - def traceContext: Option[TraceContext] -} - -trait TimedContextAware { - def timestamp: Long - def traceContext: Option[TraceContext] -} - -@Aspect -class SprayOpenRequestContextTracing { - @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") - def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } - - @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixinContextAwareToRequestContext: TimedContextAware = new TimedContextAware { - val timestamp: Long = System.nanoTime() - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } -} - -@Aspect -class SprayServerInstrumentation { - - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") - def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} - - @After("openRequestInit(openRequest, request)") - def afterInit(openRequest: ContextAware, request: HttpRequest): Unit = { - Tracer.start - openRequest.traceContext - - Tracer.context().map(_.tracer ! Rename(request.uri.path.toString())) - } - - @Pointcut("execution(* spray.can.server.OpenRequestComponent$DefaultOpenRequest.handleResponseEndAndReturnNextOpenRequest(..)) && target(openRequest)") - def openRequestCreation(openRequest: ContextAware): Unit = {} - - @After("openRequestCreation(openRequest)") - def afterFinishingRequest(openRequest: ContextAware): Unit = { - val original = openRequest.traceContext - Tracer.context().map(_.tracer ! Finish()) - - if(Tracer.context() != original) { - println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") - } - } - - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") - def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} - - @After("requestRecordInit(ctx, request)") - def whenCreatedRequestRecord(ctx: TimedContextAware, request: HttpRequest): Unit = { - // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. - for{ - tctx <- ctx.traceContext - host <- request.header[Host] - } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host) - } - - - - @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") - def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {} - - @Around("dispatchToCommander(requestContext, message)") - def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: TimedContextAware, message: Any) = { - println("Completing the request with context: " + requestContext.traceContext) - - Tracer.traceContext.withValue(requestContext.traceContext) { - requestContext.traceContext.map { - tctx => tctx.tracer ! WebExternalFinish(requestContext.timestamp) - } - pjp.proceed() - } - - } - - - @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") - def copyingRequestContext(old: TimedContextAware): Unit = {} - - @Around("copyingRequestContext(old)") - def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: TimedContextAware) = { - println("Instrumenting the request context copy.") - Tracer.traceContext.withValue(old.traceContext) { - pjp.proceed() - } - } -} - -case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext - -@Aspect -class SprayRequestContextTracing { - - @DeclareMixin("spray.can.client.HttpHostConnector.RequestContext") - def mixin: TracingAwareContext = DefaultTracingAwareRequestContext() -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala index 3766dd22..c8d0d4f0 100644 --- a/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/context/TracingAwareContext.scala @@ -1,6 +1,6 @@ package kamon.trace.context -import kamon.TraceContext +import kamon.trace.TraceContext trait TracingAwareContext { def traceContext: Option[TraceContext] diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index 21be4a73..cdfa2813 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -4,12 +4,13 @@ import org.scalatest.{WordSpecLike, Matchers} import akka.actor.{ActorRef, Actor, Props, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import kamon.{TraceContext, Tracer} +import kamon.{Tracer} import akka.pattern.{pipe, ask} import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index 6010a185..570f64dd 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -3,12 +3,13 @@ package kamon.instrumentation import scala.concurrent.{Await, Promise, Future} import org.scalatest.{Matchers, OptionValues, WordSpec} import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Tracer, Kamon, TraceContext} +import kamon.{Tracer, Kamon} import java.util.UUID import scala.util.Success import scala.concurrent.duration._ import java.util.concurrent.TimeUnit import akka.actor.ActorSystem +import kamon.trace.TraceContext class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { -- cgit v1.2.3