diff options
50 files changed, 1236 insertions, 440 deletions
@@ -1,6 +1,7 @@ *.class *.log .history +*.sc # sbt specific dist/* @@ -1,2 +1,9 @@ Kamon ===== + + + + +/metrics/actorsystem/{actorsystem-name}/dispatcher/{dispatcher-name}/ +For each dispatcher, show: + -
\ No newline at end of file diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..e6d61fa1 --- /dev/null +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -0,0 +1,34 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <weaver options="-verbose -showWeaveInfo"> + <!--<dump within="*" beforeandafter="true"/>--> + </weaver> + + <aspects> + + <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/> + <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/> + <aspect name="kamon.instrumentation.RunnableInstrumentation" /> + <aspect name="kamon.instrumentation.MessageQueueInstrumentation" /> + + <aspect name="kamon.instrumentation.InceptionAspect"/> + + <!-- ExecutorService Instrumentation for Akka. --> +<!-- <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> + <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/>--> + <aspect name="kamon.instrumentation.ActorSystemInstrumentation"/> + <aspect name ="kamon.instrumentation.ForkJoinPoolInstrumentation"/> + + + + <include within="*"/> + <exclude within="javax..*"/> + <exclude within="org.aspectj..*"/> + <exclude within="scala..*"/> + <exclude within="scalaz..*"/> + <exclude within="scalad..*"/> + <exclude within="play..*"/> + </aspects> + +</aspectj> diff --git a/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index 370acae9..370acae9 100644 --- a/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf diff --git a/src/main/resources/newrelic.yml b/kamon-core/src/main/resources/newrelic.yml index e347635e..1b1ad53b 100644 --- a/src/main/resources/newrelic.yml +++ b/kamon-core/src/main/resources/newrelic.yml @@ -48,7 +48,7 @@ common: &default_settings # This setting is dynamic, so changes do not require restarting your application. # The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest # Default is info. - log_level: info + log_level: finest enable_custom_tracing: true # Log all data to and from New Relic in plain text. @@ -70,7 +70,7 @@ common: &default_settings # The log file directory. # Default is the logs directory in the newrelic.jar parent directory. - #log_file_path: + log_file_path: /home/ivantopo/Desktop/tmp # The agent communicates with New Relic via https by # default. If you want to communicate with newrelic via http, diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala new file mode 100644 index 00000000..c3080909 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -0,0 +1,132 @@ +package kamon + +import akka.actor.{Actor, Props, ActorSystem} +import scala.collection.JavaConverters._ +import java.util.concurrent.ConcurrentHashMap +import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} +import scala.concurrent.duration.{FiniteDuration, Duration} +import com.newrelic.api.agent.NewRelic + +object Kamon { + + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + implicit lazy val actorSystem = ActorSystem("kamon") + + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + def newTraceContext(): TraceContext = TraceContext() + + + val publisher = actorSystem.actorOf(Props[TransactionPublisher]) + + def publish(tx: FullTransaction) = publisher ! tx + + + + object Metric { + val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala + + def actorSystemNames: List[String] = actorSystems.keys.toList + def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) + + def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) + } + + + + val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + +} + + + + + + + + + +object Tracer { + val ctx = new ThreadLocal[Option[TraceContext]] { + override def initialValue() = None + } + + def context() = ctx.get() + def clear = ctx.remove() + def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) + + def start = ??? //set(newTraceContext) + def stop = ctx.get match { + case Some(context) => context.close + case None => + } + + //def newTraceContext(): TraceContext = TraceContext() +} + + +class MetricManager extends Actor { + implicit val ec = context.system.dispatcher + + def receive = { + case RegisterForAllDispatchers(frequency) => { + val subscriber = sender + context.system.scheduler.schedule(frequency, frequency) { + Kamon.Metric.actorSystems.foreach { + case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { + case (dispatcherName, dispatcherMetrics) => { + val activeThreads = dispatcherMetrics.activeThreadCount.snapshot + val poolSize = dispatcherMetrics.poolSize.snapshot + val queueSize = dispatcherMetrics.queueSize.snapshot + + subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) + + } + } + } + } + } + } +} + +case class RegisterForAllDispatchers(frequency: FiniteDuration) +case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) + + + + + + +class NewrelicReporterActor extends Actor { + import scala.concurrent.duration._ + + Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + + def receive = { + case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { + /*println("PUBLISHED DISPATCHER STATS") + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat)*/ + + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) + } + } +}
\ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 19ebc578..6b32550f 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -20,6 +20,7 @@ case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], } object TraceContext { + implicit val as2 = Kamon.actorSystem.dispatcher def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) } @@ -28,3 +29,39 @@ object TraceContext { trait TraceEntry case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry + + + +case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) + + + + + +object Collector { + +} + +trait TraceEntryStorage { + def store(entry: TraceEntry): Boolean +} + +class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { + def store(entry: TraceEntry) = storage.store(entry) +} + +object ThreadLocalTraceEntryStorage extends TraceEntryStorage { + + private val storage = new ThreadLocal[List[TraceEntry]] { + override def initialValue(): List[TraceEntry] = Nil + } + + def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) + + def store(entry: TraceEntry): Boolean = { + update(entry :: _) + true + } +} + + diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala index 68ee808b..68ee808b 100644 --- a/src/main/scala/kamon/TraceContextSwap.scala +++ b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala index 0626b91d..0626b91d 100644 --- a/src/main/scala/kamon/TransactionPublisher.scala +++ b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala diff --git a/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index ed76334f..599f2a7a 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -7,8 +7,7 @@ import java.util.concurrent.TimeUnit import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout -import scala.util.Success -import scala.util.Failure +import scala.util.{Random, Success, Failure} import scala.concurrent.Future trait Message @@ -35,31 +34,24 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{ case class Ping() case class Pong() -class PingActor(val target: ActorRef) extends Actor with ActorLogging { - implicit def executionContext = context.dispatcher - implicit val timeout = Timeout(30, TimeUnit.SECONDS) +class PingActor extends Actor with ActorLogging { + val pong = context.actorOf(Props[PongActor]) + val random = new Random() def receive = { case Pong() => { - log.info(s"pong with context ${Kamon.context}") - Thread.sleep(1000) - sender ! Ping() + //Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() } - case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000) } - - def withAny(): Any = {1} - def withAnyRef(): AnyRef = {new Object} } class PongActor extends Actor with ActorLogging { def receive = { case Ping() => { - Thread.sleep(3000) sender ! Pong() - log.info(s"ping with context ${Kamon.context}") } - case a: Any => println(s"Got ${a} in PONG") } } @@ -74,8 +66,11 @@ object TryAkka extends App{ } })) - - + Kamon.start + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") @@ -100,8 +95,8 @@ object TryAkka extends App{ Kamon.stop - Thread.sleep(3000) - system.shutdown() + //Thread.sleep(3000) + //system.shutdown() /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..82915ce9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.{MessageDispatcher, Envelope} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + var processingTimeTimer: Timer = _ + var shouldTrack = false + + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") + shouldTrack = true + //} + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + println("ENVELOPE --------------------->"+envelope) + envelope match { + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() + + val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Kamon.set(c) + println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..84c20c52 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } + + def proceedWithTarget(args: AnyRef*) = { + pjp.proceed(args.toArray) + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..b4f8a475 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -0,0 +1,245 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import com.typesafe.config.Config +import kamon.Kamon +import scala.concurrent.forkjoin.ForkJoinPool +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool + + +@Aspect +class ActorSystemInstrumentation { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") + def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} + + @After("actorSystemInstantiation(name, applicationConfig, classLoader)") + def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { + + Kamon.Metric.registerActorSystem(name) + } +} + +@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") +class ForkJoinPoolInstrumentation { + var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _ + + @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") + def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} + + @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") + def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { + val (actorSystemName, dispatcherName) = threadFactory match { + case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) + case _ => ("Unknown", "Unknown") + } + + val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) + for(m <- metrics) { + activeThreadsHistogram = m.activeThreadCount + poolSizeHistogram = m.poolSize + println(s"Registered $dispatcherName for actor system $actorSystemName") + } + } + + def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { + knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) + } + + + + + @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") + def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} + + @After("forkJoinScan(fjp)") + def updateMetrics(fjp: AkkaForkJoinPool): Unit = { + activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize) + } + + + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/** + * ExecutorService monitoring base: + */ +trait ExecutorServiceCollector { + def updateActiveThreadCount(diff: Int): Unit + def updateTotalThreadCount(diff: Int): Unit + def updateQueueSize(diff: Int): Unit +} + +trait WatchedExecutorService { + def collector: ExecutorServiceCollector +} + + + + + + + + + + + + + + +trait ExecutorServiceMonitoring { + def dispatcherMetrics: DispatcherMetricCollector +} + +class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { + @volatile var dispatcherMetrics: DispatcherMetricCollector = _ +} + + + + + + + + + + + + + + + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") + def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { + true + } + + @Around("factoryMethodCall(dispatcherName, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed().asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..c21502ac --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index ef908625..e75a638f 100644 --- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -45,6 +45,12 @@ class RunnableInstrumentation { */ import kamon.TraceContextSwap.withContext + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala new file mode 100644 index 00000000..74261403 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala @@ -0,0 +1,49 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} + +class ActorCage(val name: String, val size: Int) { + + def doIt: Unit = println("name") +} + +trait CageMonitoring { + def histogram: Histogram + def count(value: Int): Unit +} + +class CageMonitoringImp extends CageMonitoring{ + final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) + + def count(value: Int) = histogram.update(value) + +} + + +@Aspect +class InceptionAspect { + + @DeclareMixin("kamon.instrumentation.ActorCage") + def mixin: CageMonitoring = new CageMonitoringImp + + + @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") + def theActorCageDidIt(actorCage: CageMonitoring) = {} + + @After("theActorCageDidIt(actorCage)") + def afterDoingIt(actorCage: CageMonitoring) = { + actorCage.count(1) + actorCage.histogram.getSnapshot.dump(System.out) + } + + + +} + + +object Runner extends App { + val cage = new ActorCage("ivan", 10) + + cage.doIt +} diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..54a13f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "threads/poolSize" + val activeThreads = "threads/activeThreads" +} + + + + diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala new file mode 100644 index 00000000..fb117968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala @@ -0,0 +1,6 @@ +package kamon.metric + +object MetricFilter { + def actorSystem(system: String): Boolean = !system.startsWith("kamon") + def actor(path: String, system: String): Boolean = true +} diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..cdc0a334 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,146 @@ +package kamon.metric + +import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import akka.actor.ActorRef +import com.codahale.metrics +import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} + + +object Metrics { + val registry: MetricRegistry = new MetricRegistry + + val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) + //consoleReporter.build().start(45, TimeUnit.SECONDS) + + //val newrelicReporter = NewRelicReporter(registry) + //newrelicReporter.start(5, TimeUnit.SECONDS) + + def include(name: String, metric: Metric) = { + //registry.register(name, metric) + } + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + + def deregister(fullName: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + +object Watched { + case object Actor + case object Dispatcher +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" + + def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" + + def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") + + def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") + + + def shouldInstrumentActor(actorPath: String): Boolean = { + !(actorPath.isEmpty || actorPath.startsWith("system")) + } + + +} + + + + + + + + + + + + +case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) + + + + +trait Histogram { + def update(value: Long): Unit + def snapshot: HistogramSnapshot +} + +trait HistogramSnapshot { + def median: Double + def max: Double + def min: Double +} + + +case class ActorSystemMetrics(actorSystemName: String) { + import scala.collection.JavaConverters._ + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + + private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) + + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { + val stats = createDispatcherCollector + dispatchers.put(dispatcherName, stats) + Some(stats) + } + +} + + +case class CodahaleHistogram() extends Histogram { + private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) + + def update(value: Long) = histogram.update(value) + def snapshot: HistogramSnapshot = { + val snapshot = histogram.getSnapshot + + CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) + } +} + +case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot + + + + + + + +/** + * Dispatcher Metrics that we care about currently with a histogram-like nature: + * - Work Queue Size + * - Total/Active Thread Count + */ + + + +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference + +object Atomic { + def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) +} + +class Atomic[T](val atomic : AtomicReference[T]) { + @tailrec + final def update(f: T => T) : T = { + val oldValue = atomic.get() + val newValue = f(oldValue) + if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) + } + + def get() = atomic.get() +}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala index 5b4ceaf4..5b4ceaf4 100644 --- a/src/main/scala/kamon/metric/MetricsUtils.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala index 56dce913..70f3e54a 100644 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -1,6 +1,7 @@ package kamon.metric -import com.codahale.metrics._ +import com.codahale.metrics +import metrics._ import java.util.concurrent.TimeUnit import java.util import com.newrelic.api.agent.NewRelic @@ -9,6 +10,8 @@ import scala.collection.JavaConverters._ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { + + private[NewRelicReporter] def processMeter(name: String, meter: Meter) { NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) } @@ -17,15 +20,32 @@ class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilt println(s"Logging to NewRelic: ${counter.getCount}") } - def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { //Process Meters meters.asScala.map{case(name, meter) => processMeter(name, meter)} //Process Meters counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} } + + } object NewRelicReporter { - def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) + def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) }
\ No newline at end of file diff --git a/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala index 7a95fc76..07532d0a 100644 --- a/src/main/scala/spraytest/ClientTest.scala +++ b/kamon-core/src/main/scala/spraytest/ClientTest.scala @@ -5,6 +5,8 @@ import spray.client.pipelining._ import spray.httpx.SprayJsonSupport import spray.json._ import scala.concurrent.Future +import spray.can.Http +import akka.io.IO /** * BEGIN JSON Infrastructure @@ -34,7 +36,7 @@ class ClientTest extends App { import SprayJsonSupport._ - + val actor = IO(Http) val pipeline = sendReceive ~> unmarshal[Container] diff --git a/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala index f592f6d7..b864d6d6 100644 --- a/src/main/scala/spraytest/FutureTesting.scala +++ b/kamon-core/src/main/scala/spraytest/FutureTesting.scala @@ -63,7 +63,7 @@ object TraceableFuture { implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { - val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.id, Nil)) + val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) new TraceableFuture(Future { wrappedBody }) } diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala new file mode 100644 index 00000000..f9d6869c --- /dev/null +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -0,0 +1,34 @@ +package test + +import akka.actor.{Props, Actor, ActorSystem} + +object PingPong extends App { + + val as = ActorSystem("ping-pong") + + val pinger = as.actorOf(Props[Pinger]) + val ponger = as.actorOf(Props[Ponger]) + + pinger.tell(Pong, ponger) + + + Thread.sleep(30000) + as.shutdown() + + +} + +case object Ping +case object Pong + +class Pinger extends Actor { + def receive = { + case Pong => sender ! Ping + } +} + +class Ponger extends Actor { + def receive = { + case Ping => sender ! Pong + } +} diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala new file mode 100644 index 00000000..0026d953 --- /dev/null +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -0,0 +1,45 @@ +package akka.instrumentation + +import org.scalatest.{WordSpecLike, Matchers} +import akka.actor.{Actor, Props, ActorSystem} + +import akka.testkit.{ImplicitSender, TestKit} +import kamon.{TraceContext, Kamon} + + +class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + + "an instrumented actor ref" when { + "used inside the context of a transaction" should { + "propagate the trace context using bang" in new TraceContextEchoFixture { + echo ! "test" + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using tell" in { + + } + + "propagate the trace context using ask" in { + + } + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Kamon.newTraceContext() + val echo = system.actorOf(Props[TraceContextEcho]) + + Kamon.set(testTraceContext) + } + +} + +class TraceContextEcho extends Actor { + def receive = { + case msg ⇒ sender ! Kamon.context() + } +} + + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala new file mode 100644 index 00000000..1eab6355 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/ActorSystemInstrumentationSpec.scala @@ -0,0 +1,22 @@ +package kamon.instrumentation + +import org.scalatest.{Matchers, WordSpec} +import akka.actor.ActorSystem +import kamon.Kamon + +class ActorSystemInstrumentationSpec extends WordSpec with Matchers { + + // TODO: Selection filters to exclude unwanted actor systems. Read from configuration. + + "the actor system instrumentation" should { + "register all actor systems created" in { + val as1 = ActorSystem("as1") + val as2 = ActorSystem("as2") + + + Kamon.Metric.actorSystem("as1") should not be (None) + Kamon.Metric.actorSystem("as2") should not be (None) + Kamon.Metric.actorSystem("unknown") should be (None) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala new file mode 100644 index 00000000..89ef61f3 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/DispatcherInstrumentationSpec.scala @@ -0,0 +1,34 @@ +package kamon.instrumentation + +import org.scalatest.{Matchers, WordSpec} +import akka.actor.{Actor, Props, ActorSystem} +import kamon.metric.MetricDirectory +import kamon.Kamon + +class DispatcherInstrumentationSpec extends WordSpec with Matchers{ + + + "the dispatcher instrumentation" should { + "instrument a dispatcher that belongs to a non-filtered actor system" in new SingleDispatcherActorSystem { + val x = Kamon.Metric.actorSystem("single-dispatcher").get.dispatchers + (1 to 10).foreach(actor ! _) + + val active = x.get("akka.actor.default-dispatcher").get.activeThreadCount.snapshot + println("Active max: "+active.max) + println("Active min: "+active.min) + + } + } + + + trait SingleDispatcherActorSystem { + val actorSystem = ActorSystem("single-dispatcher") + val actor = actorSystem.actorOf(Props(new Actor { + def receive = { + case a => sender ! a; + } + })) + + } +} + diff --git a/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala new file mode 100644 index 00000000..cc55ec92 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala @@ -0,0 +1,53 @@ +package kamon.instrumentation + +import org.scalatest.WordSpec +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import java.util.concurrent.ConcurrentLinkedQueue +import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} +import java.util.Queue +import akka.actor.{ActorSystem, Actor} + +class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { + def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) + + + /*"A MonitoredMessageQueue" should { + "update the related histogram when a message is enqueued" in { + new PopulatedMessageQueueFixture { + + assert(histogram.getSnapshot.getMax === 0) + + for(i <- 1 to 3) { enqueueDummyMessage } + + assert(histogram.getCount === 3) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + + "update the related histogram when a message is dequeued" in { + new PopulatedMessageQueueFixture { + for(i <- 1 to 3) { enqueueDummyMessage } + assert(histogram.getSnapshot.getMax === 3) + + messageQueue.dequeue() + messageQueue.dequeue() + + assert(histogram.getCount === 5) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + } + + trait PopulatedMessageQueueFixture { + + val histogram = new Histogram(new ExponentiallyDecayingReservoir()) +/* val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final def queue: Queue[Envelope] = this + }*/ + val messageQueue = new MonitoredMessageQueue(delegate, histogram) + + def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) + }*/ +} diff --git a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index 4fe9e617..de65aaca 100644 --- a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -1,9 +1,8 @@ package kamon.instrumentation import scala.concurrent.{Await, Promise, Future} -import org.scalatest.{OptionValues, WordSpec} -import org.scalatest.matchers.MustMatchers -import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.{Matchers, OptionValues, WordSpec} +import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} import kamon.{Kamon, TraceContext} import java.util.UUID import scala.util.Success @@ -12,7 +11,7 @@ import java.util.concurrent.TimeUnit import akka.actor.ActorSystem -class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues { +class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { "a instrumented runnable" when { "created in a thread that does have a TraceContext" must { @@ -20,7 +19,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF "should be available during the run method execution" in { new FutureWithContextFixture { whenReady(futureWithContext) { result => - result.value must be === testContext + result.value should equal(testContext) } }} @@ -32,7 +31,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF }) whenReady(onCompleteContext.future) { result => - result must be === testContext + result should equal(testContext) } }} } @@ -42,7 +41,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ whenReady(futureWithoutContext) { result => - result must be === None + result should equal(None) } }} @@ -54,7 +53,7 @@ class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaF }) whenReady(onCompleteContext.future) { result => - result must be === None + result should equal(None) } }} } diff --git a/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala new file mode 100644 index 00000000..e117db1b --- /dev/null +++ b/kamon-uow/src/main/scala/kamon/logging/UowActorLogging.scala @@ -0,0 +1,14 @@ +package kamon.logging + +import akka.actor.Actor +import kamon.Kamon + +trait UowActorLogging { + self: Actor => + + def logWithUOW(text: String) = { + val uow = Kamon.context.map(_.userContext).getOrElse("NA") + println(s"=======>[$uow] - $text") + } + +} diff --git a/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala new file mode 100644 index 00000000..e79602ea --- /dev/null +++ b/kamon-uow/src/main/scala/kamon/logging/UowDirectives.scala @@ -0,0 +1,28 @@ +package kamon.logging + +import java.util.concurrent.atomic.AtomicLong +import spray.routing.Directive0 +import spray.routing.directives.BasicDirectives +import java.net.InetAddress +import scala.util.Try +import kamon.Kamon + +trait UowDirectives extends BasicDirectives { + def uow: Directive0 = mapRequest { request => + val generatedUow = Some(UowDirectives.newUow) + println("Generated UOW: "+generatedUow) + Kamon.set(Kamon.newTraceContext().copy(userContext = generatedUow)) + + + request + } +} + +object UowDirectives { + val uowCounter = new AtomicLong + + val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName.toString).getOrElse("unknown-localhost") + + def newUow = "%s-%s".format(hostnamePrefix, uowCounter.incrementAndGet()) + +} diff --git a/project/Build.scala b/project/Build.scala index 37765ccf..c2822185 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -7,15 +7,24 @@ object Build extends Build { import Settings._ import Dependencies._ - lazy val root = Project("kamon", file(".")) + lazy val root = Project("root", file(".")) + .aggregate(kamonCore, kamonUow) + .settings(basicSettings: _*) + + lazy val kamonCore = Project("kamon-core", file("kamon-core")) .settings(basicSettings: _*) .settings(revolverSettings: _*) .settings(aspectJSettings: _*) .settings(newrelicSettings: _*) + .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++ - test(scalatest, sprayTestkit)) - + compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++ + test(scalatest, akkaTestKit, sprayTestkit)) + lazy val kamonUow = Project("kamon-uow", file("kamon-uow")) + .settings(basicSettings: _*) + .settings(libraryDependencies ++= + compile(akkaActor, akkaSlf4j, sprayRouting)) + .dependsOn(kamonCore) }
\ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a0d51a39..c3162065 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,22 +7,22 @@ object Dependencies { "spray nightlies repo" at "http://nightlies.spray.io" ) - val sprayCan = "io.spray" % "spray-can" % "1.1-20130509" - val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509" - val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509" - val sprayClient = "io.spray" % "spray-client" % "1.1-20130509" - val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509" + val sprayCan = "io.spray" % "spray-can" % "1.1-M8" + val sprayRouting = "io.spray" % "spray-routing" % "1.1-M8" + val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M8" + val sprayClient = "io.spray" % "spray-client" % "1.1-M8" + val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M8" val sprayJson = "io.spray" %% "spray-json" % "1.2.3" val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" - val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2" - val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.1.2" - val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.1.2" - val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.1.2" - val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b" + val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.2.0" + val akkaAgent = "com.typesafe.akka" %% "akka-agent" % "2.2.0" + val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.2.0" + val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % "2.2.0" + val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M6-SNAP22" val logback = "ch.qos.logback" % "logback-classic" % "1.0.10" val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2" - val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0-BETA2" - val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.17.2" + val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0" + val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0" def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") diff --git a/project/NewRelic.scala b/project/NewRelic.scala index 766eb28d..74f8fc30 100644 --- a/project/NewRelic.scala +++ b/project/NewRelic.scala @@ -8,6 +8,6 @@ object NewRelic { lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq( javaOptions in run <++= jvmOptions in newrelic, - newrelicVersion in newrelic := "2.18.0" + newrelicVersion in newrelic := "2.19.0" ) } diff --git a/project/Settings.scala b/project/Settings.scala index 640a8013..5fadc25d 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -7,8 +7,8 @@ object Settings { lazy val basicSettings = seq( version := VERSION, - organization := "com.despegar", - scalaVersion := "2.10.0", + organization := "kamon", + scalaVersion := "2.10.2", resolvers ++= Dependencies.resolutionRepos, fork in run := true, scalacOptions := Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index f8ce9e3c..b8910961 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,3 +8,4 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.0") addSbtPlugin("com.ivantopo.sbt" % "sbt-newrelic" % "0.0.1") + diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml deleted file mode 100644 index 1413f424..00000000 --- a/src/main/resources/META-INF/aop.xml +++ /dev/null @@ -1,27 +0,0 @@ -<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> - -<aspectj> - <!--<weaver options="-verbose -showWeaveInfo"> - <dump within="*"/> - </weaver>--> - - <aspects> - - <!--<aspect name="akka.ActorSystemAspect"/> - <!–<aspect name="akka.MailboxAspect"/>–>--> - <!--<aspect name="akka.PoolMonitorInstrumentation"/>--> - <aspect name="akka.ActorInstrumentation" /> - <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> - <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> - <aspect name="kamon.instrumentation.RunnableInstrumentation" /> - - <include within="*"/> - <exclude within="javax..*"/> - <exclude within="org.aspectj..*"/> - <exclude within="scala..*"/> - <exclude within="scalaz..*"/> - <exclude within="scalad..*"/> - <exclude within="play..*"/> - </aspects> - -</aspectj> diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala deleted file mode 100644 index afe0e459..00000000 --- a/src/main/scala/akka/ActorInstrumentation.scala +++ /dev/null @@ -1,46 +0,0 @@ -package akka - -import actor.ActorCell -import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics.{ metricsRegistry => meterRegistry } -import com.codahale.metrics.Meter -import kamon.metric.MetricsUtils._ - -@Aspect("perthis(actorCellCreation(*))") -class ActorInstrumentation { - - /** - * Aspect members - */ - - private val actorMeter:Meter = new Meter - - /** - * Pointcuts - */ - @Pointcut("execution(akka.actor.ActorCell+.new(..)) && this(actor)") - def actorCellCreation(actor:ActorCell):Unit = {} - - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - def actorReceive():Unit = {} - - /** - * Advices - */ - @After("actorCellCreation(actor)") - def afterCellCreation(actor:ActorCell):Unit ={ - val actorName:String = actor.self.path.toString - - meterRegistry.register(s"meter-for-${actorName}", actorMeter) - } - - @Around("actorReceive()") - def around(pjp: ProceedingJoinPoint) = { - import pjp._ - - markMeter(actorMeter) { - proceed - } - } - }
\ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala deleted file mode 100644 index 9d1d515d..00000000 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ /dev/null @@ -1,18 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import actor.ActorSystemImpl - -@Aspect -class ActorSystemAspect { - println("Created ActorSystemAspect") - - @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)") - protected def actorSystem():Unit = {} - - @After("actorSystem() && args(system)") - def collectActorSystem(system: ActorSystemImpl):Unit = { - Tracer.collectActorSystem(system) - Tracer.start() - } -} diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala deleted file mode 100644 index 5ca6d6ab..00000000 --- a/src/main/scala/akka/MailboxAspect.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ - -@Aspect("perthis(mailboxMonitor())") -class MailboxAspect { - println("Created MailboxAspect") - - @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") - protected def mailboxMonitor():Unit = {} - - @After("mailboxMonitor() && this(mb)") - def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = { - Tracer.collectMailbox(mb) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/MailboxMetrics.scala b/src/main/scala/akka/MailboxMetrics.scala deleted file mode 100644 index 6bf65cc7..00000000 --- a/src/main/scala/akka/MailboxMetrics.scala +++ /dev/null @@ -1,35 +0,0 @@ -package akka - -import akka.dispatch.Mailbox -import com.newrelic.api.agent.NewRelic - -case class MailboxMetrics(mailboxes:Map[String,Mailbox]) - - -object MailboxMetrics { - def apply(mailboxes: List[Mailbox]) = { - new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:research why collect an ActorSystemImpl - } - - def toMap(mb: Mailbox):Map[String,Int] = Map[String,Int]( - "NumberOfMessages" -> mb.numberOfMessages, - "MailboxDispatcherThroughput" -> mb.dispatcher.throughput, - "SuspendCount" -> mb.suspendCount - ) -} - -class MailboxSenderMetrics(mailboxes:List[Mailbox]) extends Runnable { - def run() { - val mbm = MailboxMetrics(mailboxes) - mbm.mailboxes.map { case(actorName,mb) => { - println(s"Sending metrics to Newrelic MailBoxMonitor for Actor -> ${actorName}") - - MailboxMetrics.toMap(mb).map {case(property, value) => - NewRelic.recordMetric(s"${actorName}:Mailbox:${property}", value) - } - } - } - } -} - - diff --git a/src/main/scala/akka/PoolMetrics.scala b/src/main/scala/akka/PoolMetrics.scala deleted file mode 100644 index 422e34fd..00000000 --- a/src/main/scala/akka/PoolMetrics.scala +++ /dev/null @@ -1,29 +0,0 @@ -package akka - -import scala.concurrent.forkjoin.ForkJoinPool -import com.newrelic.api.agent.NewRelic - -case class PoolMetrics(poolName:String, data:Map[String,Int]) - -object PoolMetrics { - def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool)) - - def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( - "ActiveThreadCount" -> pool.getActiveThreadCount, - "Parallelism" -> pool.getParallelism, - "PoolSize" -> pool.getPoolSize, - "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, - "StealCount" -> pool.getStealCount.toInt, - "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, - "RunningThreadCount" -> pool.getRunningThreadCount - ) -} - -class PoolMetricsSender(forkJoinPool:ForkJoinPool) extends Runnable { - def run() { - val pool = PoolMetrics(forkJoinPool) - println(s"Sending Metrics to NewRelic -> ${pool}") - pool.data.map{case(k,v) => NewRelic.recordMetric(s"${pool.poolName}:${k}",v)} - } -} - diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala deleted file mode 100644 index 167083e8..00000000 --- a/src/main/scala/akka/PoolMonitorInstrumentation.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ - -@Aspect("perthis(poolMonitor(*))") -class PoolMonitorAspect { - println("Created PoolMonitorAspect") - - @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)") - protected def poolMonitor(pool:scala.concurrent.forkjoin.ForkJoinPool):Unit = {} - - @After("poolMonitor(pool)") - def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { - - } -} diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala deleted file mode 100644 index 3b301247..00000000 --- a/src/main/scala/akka/Tracer.scala +++ /dev/null @@ -1,24 +0,0 @@ -package akka - -import actor.ActorSystemImpl -import scala.concurrent.forkjoin.ForkJoinPool -import scala.concurrent.duration._ -import akka.dispatch.Mailbox -import scala._ - -object Tracer { - protected[this] var mailboxes:List[Mailbox] = Nil - protected[this] var tracerActorSystem: ActorSystemImpl = _ - protected[this] var forkJoinPool:ForkJoinPool = _ - - def collectPool(pool: ForkJoinPool) = forkJoinPool = pool - def collectActorSystem(actorSystem: ActorSystemImpl) = tracerActorSystem = actorSystem - def collectMailbox(mb: akka.dispatch.Mailbox) = mailboxes ::= mb - - def start():Unit ={ - implicit val dispatcher = tracerActorSystem.dispatcher - - tracerActorSystem.scheduler.schedule(6 seconds, 5 second, new MailboxSenderMetrics(mailboxes)) - tracerActorSystem.scheduler.schedule(7 seconds, 5 second, new PoolMetricsSender(forkJoinPool)) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index f631b79a..00000000 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,57 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ActorRef} -import kamon.{Kamon, TraceContext} -import akka.dispatch.Envelope - -case class TraceableMessage(traceContext: TraceContext, message: Any) - - -@Aspect -class ActorRefTellInstrumentation { - println("Created ActorAspect") - - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") - def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(message, sender)") - def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import pjp._ - - Kamon.context() match { - case Some(ctx) => { - val traceableMessage = TraceableMessage(ctx, message) - proceed(getArgs.updated(0, traceableMessage)) - } - case None => proceed - } - } -} - - -@Aspect -class ActorCellInvokeInstrumentation { - - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope) = { - import pjp._ - - envelope match { - case Envelope(TraceableMessage(ctx, msg), sender) => { - Kamon.set(ctx) - - val originalEnvelope = envelope.copy(message = msg) - proceed(getArgs.updated(0, originalEnvelope)) - - Kamon.clear - } - case _ => proceed - } - } -}
\ No newline at end of file diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala deleted file mode 100644 index ef5f8044..00000000 --- a/src/main/scala/kamon/Kamon.scala +++ /dev/null @@ -1,31 +0,0 @@ -package kamon - -import akka.actor.{Props, ActorSystem} - -object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - - implicit lazy val actorSystem = ActorSystem("kamon") - - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - -} diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala deleted file mode 100644 index cf04659b..00000000 --- a/src/main/scala/kamon/metric/Metrics.scala +++ /dev/null @@ -1,14 +0,0 @@ -package kamon.metric - -import java.util.concurrent.TimeUnit -import com.codahale.metrics._ - -object Metrics { - val metricsRegistry: MetricRegistry = new MetricRegistry - - val consoleReporter = ConsoleReporter.forRegistry(metricsRegistry) - val newrelicReporter = NewRelicReporter(metricsRegistry) - - newrelicReporter.start(5, TimeUnit.SECONDS) - consoleReporter.build().start(5, TimeUnit.SECONDS) -}
\ No newline at end of file diff --git a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala deleted file mode 100644 index 4cc15a2f..00000000 --- a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ /dev/null @@ -1,42 +0,0 @@ -package akka.instrumentation - -import org.scalatest.WordSpec -import org.scalatest.matchers.{ShouldMatchers, MustMatchers} -import akka.actor.{Actor, Props, ActorSystem} -import kamon.metric.Metrics._ -import scala.collection.JavaConverters._ - - -class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMatchers { - val system = ActorSystem() - import system._ - - val echoRef = actorOf(Props(new EchoActor), "Echo-Actor") - val meterForEchoActor = "meter-for-akka://default/user/Echo-Actor" - val totalMessages = 1000 - - "an instrumented Actor" should { - "send a message and record a metric on the Metrics Registry with the number of sent messages" in { - - (1 to totalMessages).foreach {x:Int => - echoRef ! s"Message ${x}" - } - - //to ensure that all messages was received - Thread.sleep(1000) - - val messages = metricsRegistry.getMeters.asScala.get(meterForEchoActor).get.getCount - - messages should equal(totalMessages) - } - } - -} - -class EchoActor extends Actor { - def receive = { - case msg ⇒ sender ! msg - } -} - - diff --git a/src/test/scala/kamon/instrumentation/ScalaFutures.scala b/src/test/scala/kamon/instrumentation/ScalaFutures.scala deleted file mode 100644 index 169b709c..00000000 --- a/src/test/scala/kamon/instrumentation/ScalaFutures.scala +++ /dev/null @@ -1,32 +0,0 @@ -package kamon.instrumentation - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} -import org.scalatest.concurrent.Futures -import java.util.concurrent.TimeUnit - -trait ScalaFutures extends Futures { - implicit def scalaFutureToFutureConcept[T](future: Future[T]): FutureConcept[T] = new FutureConcept[T] { - def eitherValue: Option[Either[Throwable, T]] = { - if(!future.isCompleted) - None - else - future.value match { - case None => None - case Some(t) => t match { - case Success(v) => Some(Right(v)) - case Failure(e) => Some(Left(e)) - } - } - } - - def isExpired: Boolean = false // Scala futures cant expire - - def isCanceled: Boolean = false // Scala futures cannot be cancelled - - override def futureValue(implicit config: PatienceConfig): T = { - Await.result(future, Duration(config.timeout.totalNanos, TimeUnit.NANOSECONDS)) - } - } -} |