diff options
18 files changed, 147 insertions, 274 deletions
diff --git a/project/Build.scala b/project/Build.scala index b1ce638e..f3e6e8da 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -15,7 +15,7 @@ object Build extends Build { .settings( libraryDependencies ++= compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++ - test(scalatest, sprayTestkit)) + test(scalatest, akkaTestKit, sprayTestkit)) }
\ No newline at end of file diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index b4b3d879..8619b2c8 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -2,29 +2,19 @@ <aspectj> <weaver options="-verbose -showWeaveInfo"> - <dump within="*"/> + <!--<dump within="*"/>--> </weaver> <aspects> - <!--<aspect name="akka.ActorSystemAspect"/> - <!–<aspect name="akka.MailboxAspect"/>–>--> - <!--<aspect name="akka.PoolMonitorAspect"/>--> - <aspect name="akka.ActorInstrumentation" /> - <aspect name="akka.instrumentation.ActorRefTellInstrumentation"/> - <aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/> + <aspect name="kamon.instrumentation.ActorRefTellInstrumentation"/> + <aspect name="kamon.instrumentation.ActorCellInvokeInstrumentation"/> <aspect name="kamon.instrumentation.RunnableInstrumentation" /> - <!--<aspect name="kamon.instrumentation.DispatcherInstrumentation" />--> - <!--<aspect name ="akka.dispatch.FactoryInstrumentation" />--> - - <aspect name="akka.instrumentation.MessageQueueInstrumentation" /> + <aspect name="kamon.instrumentation.MessageQueueInstrumentation" /> <!-- ExecutorService Instrumentation for Akka. --> - <aspect name="akka.dispatch.ExecutorServiceFactoryProviderInstrumentation"/> - <aspect name="akka.dispatch.NamedExecutorServiceFactoryDelegateInstrumentation"/> - - - + <aspect name="kamon.instrumentation.ExecutorServiceFactoryProviderInstrumentation"/> + <aspect name="kamon.instrumentation.NamedExecutorServiceFactoryDelegateInstrumentation"/> diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala deleted file mode 100644 index aa14f237..00000000 --- a/src/main/scala/akka/ActorInstrumentation.scala +++ /dev/null @@ -1,46 +0,0 @@ -package akka - -import actor.ActorCell -import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics.{ registry => meterRegistry } -import com.codahale.metrics.Meter -import kamon.metric.MetricsUtils._ - -@Aspect("perthis(actorCellCreation(*))") -class ActorInstrumentation { - - /** - * Aspect members - */ - - private val actorMeter:Meter = new Meter - - /** - * Pointcuts - */ - @Pointcut("execution(akka.actor.ActorCell+.new(..)) && this(actor)") - def actorCellCreation(actor:ActorCell):Unit = {} - - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - def actorReceive():Unit = {} - - /** - * Advices - */ - @After("actorCellCreation(actor)") - def afterCellCreation(actor:ActorCell):Unit ={ - val actorName:String = actor.self.path.toString - - meterRegistry.register(s"meter-for-${actorName}", actorMeter) - } - - @Around("actorReceive()") - def around(pjp: ProceedingJoinPoint) = { - import pjp._ - - markMeter(actorMeter) { - proceed - } - } - }
\ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala deleted file mode 100644 index 9d1d515d..00000000 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ /dev/null @@ -1,18 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import actor.ActorSystemImpl - -@Aspect -class ActorSystemAspect { - println("Created ActorSystemAspect") - - @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)") - protected def actorSystem():Unit = {} - - @After("actorSystem() && args(system)") - def collectActorSystem(system: ActorSystemImpl):Unit = { - Tracer.collectActorSystem(system) - Tracer.start() - } -} diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala deleted file mode 100644 index 5ca6d6ab..00000000 --- a/src/main/scala/akka/MailboxAspect.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ - -@Aspect("perthis(mailboxMonitor())") -class MailboxAspect { - println("Created MailboxAspect") - - @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") - protected def mailboxMonitor():Unit = {} - - @After("mailboxMonitor() && this(mb)") - def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = { - Tracer.collectMailbox(mb) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/MailboxMetrics.scala b/src/main/scala/akka/MailboxMetrics.scala deleted file mode 100644 index 6bf65cc7..00000000 --- a/src/main/scala/akka/MailboxMetrics.scala +++ /dev/null @@ -1,35 +0,0 @@ -package akka - -import akka.dispatch.Mailbox -import com.newrelic.api.agent.NewRelic - -case class MailboxMetrics(mailboxes:Map[String,Mailbox]) - - -object MailboxMetrics { - def apply(mailboxes: List[Mailbox]) = { - new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:research why collect an ActorSystemImpl - } - - def toMap(mb: Mailbox):Map[String,Int] = Map[String,Int]( - "NumberOfMessages" -> mb.numberOfMessages, - "MailboxDispatcherThroughput" -> mb.dispatcher.throughput, - "SuspendCount" -> mb.suspendCount - ) -} - -class MailboxSenderMetrics(mailboxes:List[Mailbox]) extends Runnable { - def run() { - val mbm = MailboxMetrics(mailboxes) - mbm.mailboxes.map { case(actorName,mb) => { - println(s"Sending metrics to Newrelic MailBoxMonitor for Actor -> ${actorName}") - - MailboxMetrics.toMap(mb).map {case(property, value) => - NewRelic.recordMetric(s"${actorName}:Mailbox:${property}", value) - } - } - } - } -} - - diff --git a/src/main/scala/akka/PoolMetrics.scala b/src/main/scala/akka/PoolMetrics.scala deleted file mode 100644 index 422e34fd..00000000 --- a/src/main/scala/akka/PoolMetrics.scala +++ /dev/null @@ -1,29 +0,0 @@ -package akka - -import scala.concurrent.forkjoin.ForkJoinPool -import com.newrelic.api.agent.NewRelic - -case class PoolMetrics(poolName:String, data:Map[String,Int]) - -object PoolMetrics { - def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool)) - - def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( - "ActiveThreadCount" -> pool.getActiveThreadCount, - "Parallelism" -> pool.getParallelism, - "PoolSize" -> pool.getPoolSize, - "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, - "StealCount" -> pool.getStealCount.toInt, - "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, - "RunningThreadCount" -> pool.getRunningThreadCount - ) -} - -class PoolMetricsSender(forkJoinPool:ForkJoinPool) extends Runnable { - def run() { - val pool = PoolMetrics(forkJoinPool) - println(s"Sending Metrics to NewRelic -> ${pool}") - pool.data.map{case(k,v) => NewRelic.recordMetric(s"${pool.poolName}:${k}",v)} - } -} - diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala deleted file mode 100644 index e78e0d7e..00000000 --- a/src/main/scala/akka/PoolMonitorInstrumentation.scala +++ /dev/null @@ -1,30 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import akka.dispatch.MonitorableThreadFactory -import kamon.metric.Metrics -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.Gauge - -@Aspect("perthis(poolMonitor(scala.concurrent.forkjoin.ForkJoinPool))") -class PoolMonitorAspect { - println("Created PoolMonitorAspect") - - - @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)") - protected def poolMonitor(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {} - - @After("poolMonitor(pool)") - def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { - pool.getFactory match { - case m: MonitorableThreadFactory => registerForMonitoring(pool, m.name) - } - } - - def registerForMonitoring(fjp: ForkJoinPool, name: String) { - Metrics.registry.register(s"/metrics/actorsystem/{actorsystem-name}/dispatcher/$name", - new Gauge[Long] { - def getValue: Long = fjp.getPoolSize - }) - } -} diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala deleted file mode 100644 index 3b301247..00000000 --- a/src/main/scala/akka/Tracer.scala +++ /dev/null @@ -1,24 +0,0 @@ -package akka - -import actor.ActorSystemImpl -import scala.concurrent.forkjoin.ForkJoinPool -import scala.concurrent.duration._ -import akka.dispatch.Mailbox -import scala._ - -object Tracer { - protected[this] var mailboxes:List[Mailbox] = Nil - protected[this] var tracerActorSystem: ActorSystemImpl = _ - protected[this] var forkJoinPool:ForkJoinPool = _ - - def collectPool(pool: ForkJoinPool) = forkJoinPool = pool - def collectActorSystem(actorSystem: ActorSystemImpl) = tracerActorSystem = actorSystem - def collectMailbox(mb: akka.dispatch.Mailbox) = mailboxes ::= mb - - def start():Unit ={ - implicit val dispatcher = tracerActorSystem.dispatcher - - tracerActorSystem.scheduler.schedule(6 seconds, 5 second, new MailboxSenderMetrics(mailboxes)) - tracerActorSystem.scheduler.schedule(7 seconds, 5 second, new PoolMetricsSender(forkJoinPool)) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 218c09cc..b345eaae 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -1,4 +1,4 @@ -package akka.instrumentation +package kamon.instrumentation import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint @@ -8,32 +8,18 @@ import akka.dispatch.Envelope import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} import kamon.metric.{MetricDirectory, Metrics} -case class TraceableEnvelope(traceContext: TraceContext, message: Any, timeStamp: Long = System.nanoTime()) +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timeStamp: Long = System.nanoTime()) @Aspect class ActorRefTellInstrumentation { - println("Created ActorAspect") + import ProceedingJoinPointPimp._ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(message, sender)") - def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import pjp._ - - Kamon.context() match { - case Some(ctx) => { - val traceableMessage = TraceableEnvelope(ctx, message) - - // update the args with the new message - val args = getArgs - args.update(0, traceableMessage) - proceed(args) - } - case None => proceed - } - } + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = pjp.proceedWith(TraceableMessage(Kamon.context, message)) } @@ -41,8 +27,7 @@ class ActorRefTellInstrumentation { class ActorCellInvokeInstrumentation { val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - val messagesPer - @volatile var shouldTrack = false + var shouldTrack = false @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} @@ -67,20 +52,23 @@ class ActorCellInvokeInstrumentation { @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import pjp._ + import ProceedingJoinPointPimp._ envelope match { - case Envelope(TraceableEnvelope(ctx, msg, timeStamp), sender) => { + case Envelope(TraceableMessage(ctx, msg, timeStamp), sender) => { latencyHistogram.update(System.nanoTime() - timeStamp) - Kamon.set(ctx) - val originalEnvelope = envelope.copy(message = msg) - proceed(getArgs.updated(0, originalEnvelope)) - - Kamon.clear + ctx match { + case Some(c) => { + Kamon.set(c) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } } - case _ => proceed + case _ => pjp.proceed } } } diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..0663e801 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,19 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } +} diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 35e06b5d..f3ee4ee7 100644 --- a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -1,12 +1,11 @@ -package akka.dispatch +package kamon.instrumentation import org.aspectj.lang.annotation._ import java.util.concurrent._ -import scala.concurrent.forkjoin.ForkJoinPool import org.aspectj.lang.ProceedingJoinPoint import java.util -import akka.dispatch.NamedExecutorServiceFactoryDelegate import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { @@ -21,7 +20,7 @@ class ExecutorServiceFactoryProviderInstrumentation { @Around("factoryMethodCall(id, threadFactory)") def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast val actorSystemName = threadFactory match { case m: MonitorableThreadFactory => m.name @@ -42,7 +41,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] + val delegate = pjp.proceed().asInstanceOf[ExecutorService] val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) ExecutorServiceMetricCollector.register(executorFullName, delegate) diff --git a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index a7f5cdc8..75d6189c 100644 --- a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala +++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -1,4 +1,4 @@ -package akka.instrumentation +package kamon.instrumentation import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} import akka.dispatch.{Envelope, MessageQueue} @@ -25,40 +25,42 @@ class MessageQueueInstrumentation { val delegate = pjp.proceed.asInstanceOf[MessageQueue] // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for(own <- owner; sys <- system) yield new MonitoredMessageQueue(delegate, own, sys) + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } monitoredMailbox match { case None => delegate case Some(mmb) => mmb } - } } -class MonitoredMessageQueue(val delegate: MessageQueue, owner: ActorRef, system: ActorSystem) extends MessageQueue { - val queueSizeHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - - val fullName = MetricDirectory.nameForMailbox(system.name, MetricDirectory.nameForActor(owner)) - Metrics.registry.register(fullName, queueSizeHistogram) +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue { def enqueue(receiver: ActorRef, handle: Envelope) = { - queueSizeHistogram.update(numberOfMessages) delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) } def dequeue(): Envelope = { + val envelope = delegate.dequeue() queueSizeHistogram.update(numberOfMessages) - delegate.dequeue() + + envelope } def numberOfMessages: Int = delegate.numberOfMessages def hasMessages: Boolean = delegate.hasMessages - def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = { - Metrics.deregister(fullName) - - delegate.cleanUp(owner, deadLetters) - } + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index ebf4fd2b..30b8bda9 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -4,7 +4,14 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics._ import akka.actor.ActorRef -object Metrics { +trait MetricDepot { + def include(name: String, metric: Metric): Unit + def exclude(name: String): Unit +} + + + +object Metrics extends MetricDepot { val registry: MetricRegistry = new MetricRegistry val consoleReporter = ConsoleReporter.forRegistry(registry) @@ -14,6 +21,16 @@ object Metrics { consoleReporter.build().start(60, TimeUnit.SECONDS) + def include(name: String, metric: Metric) = registry.register(name, metric) + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + def deregister(fullName: String) = { registry.removeMatching(new MetricFilter { def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) diff --git a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index c3606d23..15e967f2 100644 --- a/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -5,10 +5,11 @@ import org.scalatest.matchers.{ShouldMatchers, MustMatchers} import akka.actor.{Actor, Props, ActorSystem} import kamon.metric.Metrics._ import scala.collection.JavaConverters._ +import akka.testkit.TestActorRef class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMatchers { - val system = ActorSystem() + implicit val system = ActorSystem() import system._ val echoRef = actorOf(Props(new EchoActor), "Echo-Actor") @@ -18,16 +19,18 @@ class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMat "an instrumented Actor" should { "send a message and record a metric on the Metrics Registry with the number of sent messages" in { + val echoActor = TestActorRef[EchoActor] + + + (1 to totalMessages).foreach {x:Int => - echoRef ! s"Message ${x}" + echoActor ! s"Message ${x}" } - //to ensure that all messages was received - Thread.sleep(1000) - - val messages = registry.getMeters.asScala.get(meterForEchoActor).get.getCount + println("After all") + //val messages = registry.getMeters.asScala.get(meterForEchoActor).get.getCount - messages should equal(totalMessages) + //messages should equal(totalMessages) } } @@ -35,7 +38,7 @@ class ActorInstrumentationSpec extends WordSpec with MustMatchers with ShouldMat class EchoActor extends Actor { def receive = { - case msg ⇒ sender ! msg + case msg ⇒ println("SOME"); sender ! msg } } diff --git a/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala new file mode 100644 index 00000000..d672b975 --- /dev/null +++ b/src/test/scala/kamon/instrumentation/MessageQueueInstrumentationSpec.scala @@ -0,0 +1,53 @@ +package kamon.instrumentation + +import org.scalatest.WordSpec +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import java.util.concurrent.ConcurrentLinkedQueue +import akka.dispatch.{UnboundedMessageQueueSemantics, QueueBasedMessageQueue, Envelope} +import java.util.Queue +import akka.actor.{ActorSystem, Actor} + +class MessageQueueInstrumentationSpec(val actorSystem: ActorSystem) extends WordSpec { + def this() = this(ActorSystem("MessageQueueInstrumentationSpec")) + + + "A MonitoredMessageQueue" should { + "update the related histogram when a message is enqueued" in { + new PopulatedMessageQueueFixture { + + assert(histogram.getSnapshot.getMax === 0) + + for(i <- 1 to 3) { enqueueDummyMessage } + + assert(histogram.getCount === 3) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + + "update the related histogram when a message is dequeued" in { + new PopulatedMessageQueueFixture { + for(i <- 1 to 3) { enqueueDummyMessage } + assert(histogram.getSnapshot.getMax === 3) + + messageQueue.dequeue() + messageQueue.dequeue() + + assert(histogram.getCount === 5) + assert(histogram.getSnapshot.getMax === 3) + assert(histogram.getSnapshot.getMin === 1) + } + } + } + + trait PopulatedMessageQueueFixture { + + val histogram = new Histogram(new ExponentiallyDecayingReservoir()) + val delegate = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final def queue: Queue[Envelope] = this + } + val messageQueue = new MonitoredMessageQueue(delegate, histogram) + + def enqueueDummyMessage = messageQueue.enqueue(Actor.noSender, Envelope("", Actor.noSender, actorSystem)) + } +} diff --git a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index 4fe9e617..359766b6 100644 --- a/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit import akka.actor.ActorSystem -class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues { +class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFuturesSupport with PatienceConfiguration with OptionValues { "a instrumented runnable" when { "created in a thread that does have a TraceContext" must { diff --git a/src/test/scala/kamon/instrumentation/ScalaFutures.scala b/src/test/scala/kamon/instrumentation/ScalaFuturesSupport.scala index 169b709c..cc87a7c8 100644 --- a/src/test/scala/kamon/instrumentation/ScalaFutures.scala +++ b/src/test/scala/kamon/instrumentation/ScalaFuturesSupport.scala @@ -6,7 +6,7 @@ import scala.util.{Failure, Success} import org.scalatest.concurrent.Futures import java.util.concurrent.TimeUnit -trait ScalaFutures extends Futures { +trait ScalaFuturesSupport extends Futures { implicit def scalaFutureToFutureConcept[T](future: Future[T]): FutureConcept[T] = new FutureConcept[T] { def eitherValue: Option[Either[Throwable, T]] = { if(!future.isCompleted) |