From e94894c5ec60a1102884894c4e73b441c79c70bc Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 5 Apr 2014 19:48:34 -0300 Subject: + kamon-core: added UnboundedMailboxInstrumentation to avoid O(n) time in .numberOfMessages --- kamon-core/src/main/resources/META-INF/aop.xml | 3 ++ .../UnboundedMailboxInstrumentation.scala | 58 ++++++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index 180d905b..27a047b0 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -16,6 +16,9 @@ + + + diff --git a/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala new file mode 100644 index 00000000..6a335dfa --- /dev/null +++ b/kamon-core/src/main/scala/akka/instrumentation/UnboundedMailboxInstrumentation.scala @@ -0,0 +1,58 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent.atomic.AtomicInteger +import org.aspectj.lang.ProceedingJoinPoint + +/** + * This aspect adds a counter to the message queue to get O(1) time in the .numberOfMessages method implementation + */ +@Aspect +class UnboundedMailboxInstrumentation { + + @DeclareMixin("akka.dispatch.UnboundedMailbox$MessageQueue") + def mixinMailboxToMailboxSizeAware: MessageQueueSizeCounting = new MessageQueueSizeCounting {} + + @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.enqueue(..)) && this(queue)") + def enqueuePointcut(queue: MessageQueueSizeCounting): Unit = {} + + @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.dequeue()) && this(queue)") + def dequeuePointcut(queue: MessageQueueSizeCounting): Unit = {} + + @Pointcut("execution(* akka.dispatch.UnboundedMailbox$MessageQueue.numberOfMessages()) && this(queue)") + def numberOfMessagesPointcut(queue: MessageQueueSizeCounting): Unit = {} + + @After("dequeuePointcut(queue)") + def afterDequeuePointcut(queue: MessageQueueSizeCounting): Unit = { + if (queue.queueSize.get() > 0) queue.queueSize.decrementAndGet() + } + + @After("enqueuePointcut(queue)") + def afterEnqueue(queue: MessageQueueSizeCounting): Unit = { + queue.queueSize.incrementAndGet() + } + + @Around("numberOfMessagesPointcut(queue)") + def aroundNumberOfMessages(pjp: ProceedingJoinPoint, queue: MessageQueueSizeCounting): Any = { + queue.queueSize.get() + } +} + +trait MessageQueueSizeCounting { + val queueSize = new AtomicInteger +} -- cgit v1.2.3